From 1cd936579db9a7c060c012d341d2207e606de7b7 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Thu, 6 Jul 2017 16:54:55 +0300 Subject: [PATCH] introduce IColumn::gather() [#CLICKHOUSE-3118] --- dbms/src/Columns/ColumnAggregateFunction.cpp | 8 ++- dbms/src/Columns/ColumnAggregateFunction.h | 2 + dbms/src/Columns/ColumnArray.cpp | 8 ++- dbms/src/Columns/ColumnArray.h | 2 + dbms/src/Columns/ColumnConst.h | 5 ++ .../Columns/ColumnConstAggregateFunction.h | 5 ++ dbms/src/Columns/ColumnFixedString.cpp | 11 +++- dbms/src/Columns/ColumnFixedString.h | 2 + dbms/src/Columns/ColumnNullable.cpp | 6 ++ dbms/src/Columns/ColumnNullable.h | 2 + dbms/src/Columns/ColumnString.cpp | 7 +++ dbms/src/Columns/ColumnString.h | 2 + dbms/src/Columns/ColumnTuple.cpp | 6 ++ dbms/src/Columns/ColumnTuple.h | 1 + dbms/src/Columns/ColumnVector.cpp | 8 +++ dbms/src/Columns/ColumnVector.h | 1 + dbms/src/Columns/IColumn.h | 8 ++- dbms/src/Columns/IColumnDummy.h | 5 ++ dbms/src/DataStreams/ColumnGathererStream.cpp | 55 +--------------- dbms/src/DataStreams/ColumnGathererStream.h | 62 ++++++++++++++++++- 20 files changed, 148 insertions(+), 58 deletions(-) diff --git a/dbms/src/Columns/ColumnAggregateFunction.cpp b/dbms/src/Columns/ColumnAggregateFunction.cpp index eb1326ba20..0f93c9a70b 100644 --- a/dbms/src/Columns/ColumnAggregateFunction.cpp +++ b/dbms/src/Columns/ColumnAggregateFunction.cpp @@ -1,5 +1,6 @@ -#include #include +#include +#include #include namespace DB @@ -340,6 +341,11 @@ void ColumnAggregateFunction::getPermutation(bool reverse, size_t limit, int nan res[i] = i; } +void ColumnAggregateFunction::gather(ColumnGathererStream & gatherer) +{ + gatherer.gather(*this); +} + void ColumnAggregateFunction::getExtremes(Field & min, Field & max) const { throw Exception("Method getExtremes is not supported for ColumnAggregateFunction.", ErrorCodes::NOT_IMPLEMENTED); diff --git a/dbms/src/Columns/ColumnAggregateFunction.h b/dbms/src/Columns/ColumnAggregateFunction.h index fe71faff9e..3916c5a151 100644 --- a/dbms/src/Columns/ColumnAggregateFunction.h +++ b/dbms/src/Columns/ColumnAggregateFunction.h @@ -151,6 +151,8 @@ public: Columns scatter(ColumnIndex num_columns, const Selector & selector) const override; + void gather(ColumnGathererStream & gatherer_stream) override; + int compareAt(size_t n, size_t m, const IColumn & rhs_, int nan_direction_hint) const override { return 0; diff --git a/dbms/src/Columns/ColumnArray.cpp b/dbms/src/Columns/ColumnArray.cpp index 9d94613040..4ceed322b9 100644 --- a/dbms/src/Columns/ColumnArray.cpp +++ b/dbms/src/Columns/ColumnArray.cpp @@ -7,6 +7,8 @@ #include #include +#include + #include #include #include @@ -608,7 +610,6 @@ ColumnPtr ColumnArray::permute(const Permutation & perm, size_t limit) const return res; } - void ColumnArray::getPermutation(bool reverse, size_t limit, int nan_direction_hint, Permutation & res) const { size_t s = size(); @@ -888,4 +889,9 @@ ColumnPtr ColumnArray::replicateTuple(const Offsets_t & replicate_offsets) const } +void ColumnArray::gather(ColumnGathererStream & gatherer) +{ + gatherer.gather(*this); +} + } diff --git a/dbms/src/Columns/ColumnArray.h b/dbms/src/Columns/ColumnArray.h index 7c5d971e31..82015b6aff 100644 --- a/dbms/src/Columns/ColumnArray.h +++ b/dbms/src/Columns/ColumnArray.h @@ -80,6 +80,8 @@ public: return scatterImpl(num_columns, selector); } + void gather(ColumnGathererStream & gatherer_stream) override; + private: ColumnPtr data; ColumnPtr offsets; /// Displacements can be shared across multiple columns - to implement nested data structures. diff --git a/dbms/src/Columns/ColumnConst.h b/dbms/src/Columns/ColumnConst.h index 72a6003438..41536a04d9 100644 --- a/dbms/src/Columns/ColumnConst.h +++ b/dbms/src/Columns/ColumnConst.h @@ -207,6 +207,11 @@ public: res[i] = i; } + void gather(ColumnGathererStream &) override + { + throw Exception("Cannot gather into constant column " + getName(), ErrorCodes::NOT_IMPLEMENTED); + } + DataTypePtr & getDataType() { return data_type; } const DataTypePtr & getDataType() const { return data_type; } }; diff --git a/dbms/src/Columns/ColumnConstAggregateFunction.h b/dbms/src/Columns/ColumnConstAggregateFunction.h index 2c89d1786a..400a7afcec 100644 --- a/dbms/src/Columns/ColumnConstAggregateFunction.h +++ b/dbms/src/Columns/ColumnConstAggregateFunction.h @@ -82,6 +82,11 @@ public: ColumnPtr replicate(const Offsets_t & offsets) const override; + void gather(ColumnGathererStream &) override + { + throw Exception("Cannot gather into constant column " + getName(), ErrorCodes::NOT_IMPLEMENTED); + } + void getExtremes(Field & min, Field & max) const override; size_t byteSize() const override; diff --git a/dbms/src/Columns/ColumnFixedString.cpp b/dbms/src/Columns/ColumnFixedString.cpp index 6aeee31e18..7e76195980 100644 --- a/dbms/src/Columns/ColumnFixedString.cpp +++ b/dbms/src/Columns/ColumnFixedString.cpp @@ -1,10 +1,12 @@ +#include + #include #include #include -#include +#include -#include +#include #if __SSE2__ #include @@ -278,6 +280,11 @@ ColumnPtr ColumnFixedString::replicate(const Offsets_t & offsets) const return res; } +void ColumnFixedString::gather(ColumnGathererStream & gatherer) +{ + gatherer.gather(*this); +} + void ColumnFixedString::getExtremes(Field & min, Field & max) const { min = String(); diff --git a/dbms/src/Columns/ColumnFixedString.h b/dbms/src/Columns/ColumnFixedString.h index 8331544432..9de5a61140 100644 --- a/dbms/src/Columns/ColumnFixedString.h +++ b/dbms/src/Columns/ColumnFixedString.h @@ -120,6 +120,8 @@ public: return scatterImpl(num_columns, selector); } + void gather(ColumnGathererStream & gatherer_stream) override; + void reserve(size_t size) override { chars.reserve(n * size); diff --git a/dbms/src/Columns/ColumnNullable.cpp b/dbms/src/Columns/ColumnNullable.cpp index a155b38217..48f5fa101d 100644 --- a/dbms/src/Columns/ColumnNullable.cpp +++ b/dbms/src/Columns/ColumnNullable.cpp @@ -5,6 +5,7 @@ #include #include #include +#include namespace DB @@ -298,6 +299,11 @@ void ColumnNullable::getPermutation(bool reverse, size_t limit, int null_directi } } +void ColumnNullable::gather(ColumnGathererStream & gatherer) +{ + gatherer.gather(*this); +} + void ColumnNullable::reserve(size_t n) { nested_column->reserve(n); diff --git a/dbms/src/Columns/ColumnNullable.h b/dbms/src/Columns/ColumnNullable.h index 5fa7cb8be3..090f48e63f 100644 --- a/dbms/src/Columns/ColumnNullable.h +++ b/dbms/src/Columns/ColumnNullable.h @@ -67,6 +67,8 @@ public: return scatterImpl(num_columns, selector); } + void gather(ColumnGathererStream & gatherer_stream) override; + /// Return the column that represents values. ColumnPtr & getNestedColumn() { return nested_column; } const ColumnPtr & getNestedColumn() const { return nested_column; } diff --git a/dbms/src/Columns/ColumnString.cpp b/dbms/src/Columns/ColumnString.cpp index f109838bdf..cfe0097f18 100644 --- a/dbms/src/Columns/ColumnString.cpp +++ b/dbms/src/Columns/ColumnString.cpp @@ -3,6 +3,7 @@ #include #include #include +#include namespace DB @@ -247,6 +248,12 @@ ColumnPtr ColumnString::replicate(const Offsets_t & replicate_offsets) const } +void ColumnString::gather(ColumnGathererStream & gatherer) +{ + gatherer.gather(*this); +} + + void ColumnString::reserve(size_t n) { offsets.reserve(n); diff --git a/dbms/src/Columns/ColumnString.h b/dbms/src/Columns/ColumnString.h index c99a70b149..dcd35cf912 100644 --- a/dbms/src/Columns/ColumnString.h +++ b/dbms/src/Columns/ColumnString.h @@ -238,6 +238,8 @@ public: return scatterImpl(num_columns, selector); } + void gather(ColumnGathererStream & gatherer_stream) override; + void reserve(size_t n) override; void getExtremes(Field & min, Field & max) const override; diff --git a/dbms/src/Columns/ColumnTuple.cpp b/dbms/src/Columns/ColumnTuple.cpp index 40dddf233c..2162f0e6c9 100644 --- a/dbms/src/Columns/ColumnTuple.cpp +++ b/dbms/src/Columns/ColumnTuple.cpp @@ -1,6 +1,7 @@ #include #include #include +#include namespace DB @@ -231,6 +232,11 @@ void ColumnTuple::getPermutation(bool reverse, size_t limit, int nan_direction_h } } +void ColumnTuple::gather(ColumnGathererStream & gatherer) +{ + gatherer.gather(*this); +} + void ColumnTuple::reserve(size_t n) { for (auto & column : columns) diff --git a/dbms/src/Columns/ColumnTuple.h b/dbms/src/Columns/ColumnTuple.h index 643e3ffa71..a54f8845d6 100644 --- a/dbms/src/Columns/ColumnTuple.h +++ b/dbms/src/Columns/ColumnTuple.h @@ -51,6 +51,7 @@ public: ColumnPtr permute(const Permutation & perm, size_t limit) const override; ColumnPtr replicate(const Offsets_t & offsets) const override; Columns scatter(ColumnIndex num_columns, const Selector & selector) const override; + void gather(ColumnGathererStream & gatherer_stream) override; int compareAt(size_t n, size_t m, const IColumn & rhs, int nan_direction_hint) const override; void getExtremes(Field & min, Field & max) const override; void getPermutation(bool reverse, size_t limit, int nan_direction_hint, Permutation & res) const override; diff --git a/dbms/src/Columns/ColumnVector.cpp b/dbms/src/Columns/ColumnVector.cpp index 21f89514da..dde5619da1 100644 --- a/dbms/src/Columns/ColumnVector.cpp +++ b/dbms/src/Columns/ColumnVector.cpp @@ -11,6 +11,8 @@ #include +#include + #include #if __SSE2__ @@ -255,6 +257,12 @@ ColumnPtr ColumnVector::replicate(const IColumn::Offsets_t & offsets) const return res; } +template +void ColumnVector::gather(ColumnGathererStream & gatherer) +{ + gatherer.gather(*this); +} + template void ColumnVector::getExtremes(Field & min, Field & max) const { diff --git a/dbms/src/Columns/ColumnVector.h b/dbms/src/Columns/ColumnVector.h index b332119128..bc13346f44 100644 --- a/dbms/src/Columns/ColumnVector.h +++ b/dbms/src/Columns/ColumnVector.h @@ -241,6 +241,7 @@ public: return this->scatterImpl(num_columns, selector); } + void gather(ColumnGathererStream & gatherer_stream) override; /** More efficient methods of manipulation - to manipulate with data directly. */ Container_t & getData() diff --git a/dbms/src/Columns/IColumn.h b/dbms/src/Columns/IColumn.h index 59e9e0dccd..93e9ea5f31 100644 --- a/dbms/src/Columns/IColumn.h +++ b/dbms/src/Columns/IColumn.h @@ -31,7 +31,7 @@ using ColumnPlainPtrs = std::vector; using ConstColumnPlainPtrs = std::vector; class Arena; - +class ColumnGathererStream; /// Declares interface to store columns in memory. class IColumn : private boost::noncopyable @@ -225,6 +225,12 @@ public: using Selector = PaddedPODArray; virtual Columns scatter(ColumnIndex num_columns, const Selector & selector) const = 0; + /// Insert data from several other columns according to source mask (used in vertical merge). + /// For now it is a helper to de-virtualize calls to insert*() functions inside gather loop + /// (descendants should call gatherer_stream.gather(*this) to implement this function.) + /// TODO: interface decoupled from ColumnGathererStream that allows non-generic specializations. + virtual void gather(ColumnGathererStream & gatherer_stream) = 0; + /** Computes minimum and maximum element of the column. * In addition to numeric types, the funtion is completely implemented for Date and DateTime. * For strings and arrays function should retrurn default value. diff --git a/dbms/src/Columns/IColumnDummy.h b/dbms/src/Columns/IColumnDummy.h index 6b83f7b725..337e591079 100644 --- a/dbms/src/Columns/IColumnDummy.h +++ b/dbms/src/Columns/IColumnDummy.h @@ -103,6 +103,11 @@ public: return res; } + void gather(ColumnGathererStream &) override + { + throw Exception("Method gather is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED); + } + void getExtremes(Field & min, Field & max) const override { throw Exception("Method getExtremes is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED); diff --git a/dbms/src/DataStreams/ColumnGathererStream.cpp b/dbms/src/DataStreams/ColumnGathererStream.cpp index 389b78b23d..dffa84ec90 100644 --- a/dbms/src/DataStreams/ColumnGathererStream.cpp +++ b/dbms/src/DataStreams/ColumnGathererStream.cpp @@ -78,61 +78,12 @@ Block ColumnGathererStream::readImpl() if (pos_global_start >= row_source.size()) return Block(); - Block block_res{column.cloneEmpty()}; + block_res = Block{column.cloneEmpty()}; IColumn & column_res = *block_res.getByPosition(0).column; - size_t global_size = row_source.size(); - size_t curr_block_preferred_size = std::min(global_size - pos_global_start, block_preferred_size); - column_res.reserve(curr_block_preferred_size); + column_res.gather(*this); - size_t pos_global = pos_global_start; - while (pos_global < global_size && column_res.size() < curr_block_preferred_size) - { - auto source_data = row_source[pos_global].getData(); - bool source_skip = row_source[pos_global].getSkipFlag(); - auto source_num = row_source[pos_global].getSourceNum(); - Source & source = sources[source_num]; - - if (source.pos >= source.size) /// Fetch new block from source_num part - { - fetchNewBlock(source, source_num); - } - - /// Consecutive optimization. TODO: precompute lens - size_t len = 1; - size_t max_len = std::min(global_size - pos_global, source.size - source.pos); // interval should be in the same block - for (; len < max_len && source_data == row_source[pos_global + len].getData(); ++len); - - if (!source_skip) - { - /// Whole block could be produced via copying pointer from current block - if (source.pos == 0 && source.size == len) - { - /// If current block already contains data, return it. We will be here again on next read() iteration. - if (column_res.size() != 0) - break; - - block_res.getByPosition(0).column = source.block.getByName(name).column; - source.pos += len; - pos_global += len; - break; - } - else if (len == 1) - { - column_res.insertFrom(*source.column, source.pos); - } - else - { - column_res.insertRangeFrom(*source.column, source.pos, len); - } - } - - source.pos += len; - pos_global += len; - } - pos_global_start = pos_global; - - return block_res; + return std::move(block_res); } diff --git a/dbms/src/DataStreams/ColumnGathererStream.h b/dbms/src/DataStreams/ColumnGathererStream.h index 469ff19746..b74d25c185 100644 --- a/dbms/src/DataStreams/ColumnGathererStream.h +++ b/dbms/src/DataStreams/ColumnGathererStream.h @@ -70,8 +70,11 @@ public: void readSuffixImpl() override; -private: + /// for use in implementations of IColumn::gather() + template + void gather(Column & column_res); +private: String name; ColumnWithTypeAndName column; const MergedRowSources & row_source; @@ -105,7 +108,64 @@ private: size_t pos_global_start = 0; size_t block_preferred_size; + Block block_res; + Poco::Logger * log; }; +template +void ColumnGathererStream::gather(Column & column_res) +{ + size_t global_size = row_source.size(); + size_t curr_block_preferred_size = std::min(global_size - pos_global_start, block_preferred_size); + column_res.reserve(curr_block_preferred_size); + + size_t pos_global = pos_global_start; + while (pos_global < global_size && column_res.size() < curr_block_preferred_size) + { + auto source_data = row_source[pos_global].getData(); + bool source_skip = row_source[pos_global].getSkipFlag(); + auto source_num = row_source[pos_global].getSourceNum(); + Source & source = sources[source_num]; + + if (source.pos >= source.size) /// Fetch new block from source_num part + { + fetchNewBlock(source, source_num); + } + + /// Consecutive optimization. TODO: precompute lens + size_t len = 1; + size_t max_len = std::min(global_size - pos_global, source.size - source.pos); // interval should be in the same block + for (; len < max_len && source_data == row_source[pos_global + len].getData(); ++len); + + if (!source_skip) + { + /// Whole block could be produced via copying pointer from current block + if (source.pos == 0 && source.size == len) + { + /// If current block already contains data, return it. We will be here again on next read() iteration. + if (column_res.size() != 0) + break; + + block_res.getByPosition(0).column = source.block.getByName(name).column; + source.pos += len; + pos_global += len; + break; + } + else if (len == 1) + { + column_res.insertFrom(*source.column, source.pos); + } + else + { + column_res.insertRangeFrom(*source.column, source.pos, len); + } + } + + source.pos += len; + pos_global += len; + } + pos_global_start = pos_global; +} + } -- GitLab