提交 1cd93657 编写于 作者: A Alexey Zatelepin 提交者: alexey-milovidov

introduce IColumn::gather() [#CLICKHOUSE-3118]

上级 807bd7cb
#include <AggregateFunctions/AggregateFunctionState.h>
#include <Columns/ColumnAggregateFunction.h>
#include <AggregateFunctions/AggregateFunctionState.h>
#include <DataStreams/ColumnGathererStream.h>
#include <Common/SipHash.h>
namespace DB
......@@ -340,6 +341,11 @@ void ColumnAggregateFunction::getPermutation(bool reverse, size_t limit, int nan
res[i] = i;
}
void ColumnAggregateFunction::gather(ColumnGathererStream & gatherer)
{
gatherer.gather(*this);
}
void ColumnAggregateFunction::getExtremes(Field & min, Field & max) const
{
throw Exception("Method getExtremes is not supported for ColumnAggregateFunction.", ErrorCodes::NOT_IMPLEMENTED);
......
......@@ -151,6 +151,8 @@ public:
Columns scatter(ColumnIndex num_columns, const Selector & selector) const override;
void gather(ColumnGathererStream & gatherer_stream) override;
int compareAt(size_t n, size_t m, const IColumn & rhs_, int nan_direction_hint) const override
{
return 0;
......
......@@ -7,6 +7,8 @@
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnsCommon.h>
#include <DataStreams/ColumnGathererStream.h>
#include <Common/Exception.h>
#include <Common/Arena.h>
#include <Common/SipHash.h>
......@@ -608,7 +610,6 @@ ColumnPtr ColumnArray::permute(const Permutation & perm, size_t limit) const
return res;
}
void ColumnArray::getPermutation(bool reverse, size_t limit, int nan_direction_hint, Permutation & res) const
{
size_t s = size();
......@@ -888,4 +889,9 @@ ColumnPtr ColumnArray::replicateTuple(const Offsets_t & replicate_offsets) const
}
void ColumnArray::gather(ColumnGathererStream & gatherer)
{
gatherer.gather(*this);
}
}
......@@ -80,6 +80,8 @@ public:
return scatterImpl<ColumnArray>(num_columns, selector);
}
void gather(ColumnGathererStream & gatherer_stream) override;
private:
ColumnPtr data;
ColumnPtr offsets; /// Displacements can be shared across multiple columns - to implement nested data structures.
......
......@@ -207,6 +207,11 @@ public:
res[i] = i;
}
void gather(ColumnGathererStream &) override
{
throw Exception("Cannot gather into constant column " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
DataTypePtr & getDataType() { return data_type; }
const DataTypePtr & getDataType() const { return data_type; }
};
......
......@@ -82,6 +82,11 @@ public:
ColumnPtr replicate(const Offsets_t & offsets) const override;
void gather(ColumnGathererStream &) override
{
throw Exception("Cannot gather into constant column " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
void getExtremes(Field & min, Field & max) const override;
size_t byteSize() const override;
......
#include <Columns/ColumnFixedString.h>
#include <Common/Arena.h>
#include <Common/SipHash.h>
#include <Common/memcpySmall.h>
#include <IO/WriteHelpers.h>
#include <DataStreams/ColumnGathererStream.h>
#include <Columns/ColumnFixedString.h>
#include <IO/WriteHelpers.h>
#if __SSE2__
#include <emmintrin.h>
......@@ -278,6 +280,11 @@ ColumnPtr ColumnFixedString::replicate(const Offsets_t & offsets) const
return res;
}
void ColumnFixedString::gather(ColumnGathererStream & gatherer)
{
gatherer.gather(*this);
}
void ColumnFixedString::getExtremes(Field & min, Field & max) const
{
min = String();
......
......@@ -120,6 +120,8 @@ public:
return scatterImpl<ColumnFixedString>(num_columns, selector);
}
void gather(ColumnGathererStream & gatherer_stream) override;
void reserve(size_t size) override
{
chars.reserve(n * size);
......
......@@ -5,6 +5,7 @@
#include <Columns/ColumnArray.h>
#include <Columns/ColumnTuple.h>
#include <Columns/ColumnAggregateFunction.h>
#include <DataStreams/ColumnGathererStream.h>
namespace DB
......@@ -298,6 +299,11 @@ void ColumnNullable::getPermutation(bool reverse, size_t limit, int null_directi
}
}
void ColumnNullable::gather(ColumnGathererStream & gatherer)
{
gatherer.gather(*this);
}
void ColumnNullable::reserve(size_t n)
{
nested_column->reserve(n);
......
......@@ -67,6 +67,8 @@ public:
return scatterImpl<ColumnNullable>(num_columns, selector);
}
void gather(ColumnGathererStream & gatherer_stream) override;
/// Return the column that represents values.
ColumnPtr & getNestedColumn() { return nested_column; }
const ColumnPtr & getNestedColumn() const { return nested_column; }
......
......@@ -3,6 +3,7 @@
#include <Common/Collator.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsCommon.h>
#include <DataStreams/ColumnGathererStream.h>
namespace DB
......@@ -247,6 +248,12 @@ ColumnPtr ColumnString::replicate(const Offsets_t & replicate_offsets) const
}
void ColumnString::gather(ColumnGathererStream & gatherer)
{
gatherer.gather(*this);
}
void ColumnString::reserve(size_t n)
{
offsets.reserve(n);
......
......@@ -238,6 +238,8 @@ public:
return scatterImpl<ColumnString>(num_columns, selector);
}
void gather(ColumnGathererStream & gatherer_stream) override;
void reserve(size_t n) override;
void getExtremes(Field & min, Field & max) const override;
......
#include <Columns/ColumnTuple.h>
#include <ext/map.h>
#include <ext/range.h>
#include <DataStreams/ColumnGathererStream.h>
namespace DB
......@@ -231,6 +232,11 @@ void ColumnTuple::getPermutation(bool reverse, size_t limit, int nan_direction_h
}
}
void ColumnTuple::gather(ColumnGathererStream & gatherer)
{
gatherer.gather(*this);
}
void ColumnTuple::reserve(size_t n)
{
for (auto & column : columns)
......
......@@ -51,6 +51,7 @@ public:
ColumnPtr permute(const Permutation & perm, size_t limit) const override;
ColumnPtr replicate(const Offsets_t & offsets) const override;
Columns scatter(ColumnIndex num_columns, const Selector & selector) const override;
void gather(ColumnGathererStream & gatherer_stream) override;
int compareAt(size_t n, size_t m, const IColumn & rhs, int nan_direction_hint) const override;
void getExtremes(Field & min, Field & max) const override;
void getPermutation(bool reverse, size_t limit, int nan_direction_hint, Permutation & res) const override;
......
......@@ -11,6 +11,8 @@
#include <Columns/ColumnVector.h>
#include <DataStreams/ColumnGathererStream.h>
#include <ext/bit_cast.h>
#if __SSE2__
......@@ -255,6 +257,12 @@ ColumnPtr ColumnVector<T>::replicate(const IColumn::Offsets_t & offsets) const
return res;
}
template <typename T>
void ColumnVector<T>::gather(ColumnGathererStream & gatherer)
{
gatherer.gather(*this);
}
template <typename T>
void ColumnVector<T>::getExtremes(Field & min, Field & max) const
{
......
......@@ -241,6 +241,7 @@ public:
return this->scatterImpl<Self>(num_columns, selector);
}
void gather(ColumnGathererStream & gatherer_stream) override;
/** More efficient methods of manipulation - to manipulate with data directly. */
Container_t & getData()
......
......@@ -31,7 +31,7 @@ using ColumnPlainPtrs = std::vector<IColumn *>;
using ConstColumnPlainPtrs = std::vector<const IColumn *>;
class Arena;
class ColumnGathererStream;
/// Declares interface to store columns in memory.
class IColumn : private boost::noncopyable
......@@ -225,6 +225,12 @@ public:
using Selector = PaddedPODArray<ColumnIndex>;
virtual Columns scatter(ColumnIndex num_columns, const Selector & selector) const = 0;
/// Insert data from several other columns according to source mask (used in vertical merge).
/// For now it is a helper to de-virtualize calls to insert*() functions inside gather loop
/// (descendants should call gatherer_stream.gather(*this) to implement this function.)
/// TODO: interface decoupled from ColumnGathererStream that allows non-generic specializations.
virtual void gather(ColumnGathererStream & gatherer_stream) = 0;
/** Computes minimum and maximum element of the column.
* In addition to numeric types, the funtion is completely implemented for Date and DateTime.
* For strings and arrays function should retrurn default value.
......
......@@ -103,6 +103,11 @@ public:
return res;
}
void gather(ColumnGathererStream &) override
{
throw Exception("Method gather is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
void getExtremes(Field & min, Field & max) const override
{
throw Exception("Method getExtremes is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
......
......@@ -78,61 +78,12 @@ Block ColumnGathererStream::readImpl()
if (pos_global_start >= row_source.size())
return Block();
Block block_res{column.cloneEmpty()};
block_res = Block{column.cloneEmpty()};
IColumn & column_res = *block_res.getByPosition(0).column;
size_t global_size = row_source.size();
size_t curr_block_preferred_size = std::min(global_size - pos_global_start, block_preferred_size);
column_res.reserve(curr_block_preferred_size);
column_res.gather(*this);
size_t pos_global = pos_global_start;
while (pos_global < global_size && column_res.size() < curr_block_preferred_size)
{
auto source_data = row_source[pos_global].getData();
bool source_skip = row_source[pos_global].getSkipFlag();
auto source_num = row_source[pos_global].getSourceNum();
Source & source = sources[source_num];
if (source.pos >= source.size) /// Fetch new block from source_num part
{
fetchNewBlock(source, source_num);
}
/// Consecutive optimization. TODO: precompute lens
size_t len = 1;
size_t max_len = std::min(global_size - pos_global, source.size - source.pos); // interval should be in the same block
for (; len < max_len && source_data == row_source[pos_global + len].getData(); ++len);
if (!source_skip)
{
/// Whole block could be produced via copying pointer from current block
if (source.pos == 0 && source.size == len)
{
/// If current block already contains data, return it. We will be here again on next read() iteration.
if (column_res.size() != 0)
break;
block_res.getByPosition(0).column = source.block.getByName(name).column;
source.pos += len;
pos_global += len;
break;
}
else if (len == 1)
{
column_res.insertFrom(*source.column, source.pos);
}
else
{
column_res.insertRangeFrom(*source.column, source.pos, len);
}
}
source.pos += len;
pos_global += len;
}
pos_global_start = pos_global;
return block_res;
return std::move(block_res);
}
......
......@@ -70,8 +70,11 @@ public:
void readSuffixImpl() override;
private:
/// for use in implementations of IColumn::gather()
template<typename Column>
void gather(Column & column_res);
private:
String name;
ColumnWithTypeAndName column;
const MergedRowSources & row_source;
......@@ -105,7 +108,64 @@ private:
size_t pos_global_start = 0;
size_t block_preferred_size;
Block block_res;
Poco::Logger * log;
};
template<typename Column>
void ColumnGathererStream::gather(Column & column_res)
{
size_t global_size = row_source.size();
size_t curr_block_preferred_size = std::min(global_size - pos_global_start, block_preferred_size);
column_res.reserve(curr_block_preferred_size);
size_t pos_global = pos_global_start;
while (pos_global < global_size && column_res.size() < curr_block_preferred_size)
{
auto source_data = row_source[pos_global].getData();
bool source_skip = row_source[pos_global].getSkipFlag();
auto source_num = row_source[pos_global].getSourceNum();
Source & source = sources[source_num];
if (source.pos >= source.size) /// Fetch new block from source_num part
{
fetchNewBlock(source, source_num);
}
/// Consecutive optimization. TODO: precompute lens
size_t len = 1;
size_t max_len = std::min(global_size - pos_global, source.size - source.pos); // interval should be in the same block
for (; len < max_len && source_data == row_source[pos_global + len].getData(); ++len);
if (!source_skip)
{
/// Whole block could be produced via copying pointer from current block
if (source.pos == 0 && source.size == len)
{
/// If current block already contains data, return it. We will be here again on next read() iteration.
if (column_res.size() != 0)
break;
block_res.getByPosition(0).column = source.block.getByName(name).column;
source.pos += len;
pos_global += len;
break;
}
else if (len == 1)
{
column_res.insertFrom(*source.column, source.pos);
}
else
{
column_res.insertRangeFrom(*source.column, source.pos, len);
}
}
source.pos += len;
pos_global += len;
}
pos_global_start = pos_global;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册