提交 c640065f 编写于 作者: A Alexey Milovidov

dbms: don't zero-fill columns with numbers [#METR-8582].

上级 278f5a92
......@@ -114,13 +114,12 @@ public:
return 0;
}
Permutation getPermutation() const
void getPermutation(bool reverse, size_t limit, Permutation & res) const
{
size_t s = data.size();
Permutation res(s);
res.resize(s);
for (size_t i = 0; i < s; ++i)
res[i] = i;
return res;
}
};
......
......@@ -261,11 +261,12 @@ public:
}
};
Permutation getPermutation(bool reverse, size_t limit) const
void getPermutation(bool reverse, size_t limit, Permutation & res) const
{
Permutation nested_perm = data->getPermutation(reverse, limit);
Permutation nested_perm;
data->getPermutation(reverse, limit, nested_perm);
size_t s = size();
Permutation res(s);
res.resize(s);
for (size_t i = 0; i < s; ++i)
res[i] = i;
......@@ -286,8 +287,6 @@ public:
else
std::sort(res.begin(), res.end(), less<true>(*this, nested_perm));
}
return res;
}
void reserve(size_t n)
......
......@@ -120,12 +120,11 @@ public:
: 1);
}
Permutation getPermutation(bool reverse, size_t limit) const
void getPermutation(bool reverse, size_t limit, Permutation & res) const
{
Permutation res(s);
res.resize(s);
for (size_t i = 0; i < s; ++i)
res[i] = i;
return res;
}
StringRef getDataAt(size_t n) const;
......
......@@ -123,10 +123,10 @@ public:
}
};
Permutation getPermutation(bool reverse, size_t limit) const
void getPermutation(bool reverse, size_t limit, Permutation & res) const
{
size_t s = size();
Permutation res(s);
res.resize(s);
for (size_t i = 0; i < s; ++i)
res[i] = i;
......@@ -147,8 +147,6 @@ public:
else
std::sort(res.begin(), res.end(), less<true>(*this));
}
return res;
}
ColumnPtr cut(size_t start, size_t length) const
......
......@@ -250,7 +250,7 @@ public:
throw Exception("Method compareAt is not supported for ColumnNested.", ErrorCodes::NOT_IMPLEMENTED);
}
Permutation getPermutation(bool reverse, size_t limit) const
void getPermutation(bool reverse, size_t limit, Permutation & res) const
{
throw Exception("Method getPermutation is not supported for ColumnNested.", ErrorCodes::NOT_IMPLEMENTED);
}
......
......@@ -272,10 +272,10 @@ public:
}
};
Permutation getPermutation(bool reverse, size_t limit) const
void getPermutation(bool reverse, size_t limit, Permutation & res) const
{
size_t s = offsets.size();
Permutation res(s);
res.resize(s);
for (size_t i = 0; i < s; ++i)
res[i] = i;
......@@ -296,8 +296,6 @@ public:
else
std::sort(res.begin(), res.end(), less<true>(*this));
}
return res;
}
template <bool positive>
......@@ -317,10 +315,10 @@ public:
};
/// Сортировка с учетом Collation
Permutation getPermutationWithCollation(const Collator & collator, bool reverse, size_t limit) const
void getPermutationWithCollation(const Collator & collator, bool reverse, size_t limit, Permutation & res) const
{
size_t s = offsets.size();
Permutation res(s);
res.resize(s);
for (size_t i = 0; i < s; ++i)
res[i] = i;
......@@ -341,8 +339,6 @@ public:
else
std::sort(res.begin(), res.end(), lessWithCollation<true>(*this, collator));
}
return res;
}
ColumnPtr replicate(const Offsets_t & replicate_offsets) const
......
......@@ -175,12 +175,12 @@ public:
}
};
Permutation getPermutation(bool reverse, size_t limit) const
void getPermutation(bool reverse, size_t limit, Permutation & res) const
{
size_t rows = size();
Permutation perm(rows);
res.resize(rows);
for (size_t i = 0; i < rows; ++i)
perm[i] = i;
res[i] = i;
if (limit > rows)
limit = 0;
......@@ -188,19 +188,17 @@ public:
if (limit)
{
if (reverse)
std::partial_sort(perm.begin(), perm.begin() + limit, perm.end(), Less<false>(columns));
std::partial_sort(res.begin(), res.begin() + limit, res.end(), Less<false>(columns));
else
std::partial_sort(perm.begin(), perm.begin() + limit, perm.end(), Less<true>(columns));
std::partial_sort(res.begin(), res.begin() + limit, res.end(), Less<true>(columns));
}
else
{
if (reverse)
std::sort(perm.begin(), perm.end(), Less<false>(columns));
std::sort(res.begin(), res.end(), Less<false>(columns));
else
std::sort(perm.begin(), perm.end(), Less<true>(columns));
std::sort(res.begin(), res.end(), Less<true>(columns));
}
return perm;
}
void reserve(size_t n)
......
......@@ -2,8 +2,6 @@
#include <string.h>
#include <boost/type_traits/make_signed.hpp>
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
......@@ -85,7 +83,7 @@ private:
typedef ColumnVectorBase<T> Self;
public:
typedef T value_type;
typedef std::vector<value_type> Container_t;
typedef PODArray<value_type> Container_t;
ColumnVectorBase() {}
ColumnVectorBase(size_t n) : data(n) {}
......@@ -144,10 +142,10 @@ public:
bool operator()(size_t lhs, size_t rhs) const { return CompareHelper<T>::greater(parent.data[lhs], parent.data[rhs]); }
};
Permutation getPermutation(bool reverse, size_t limit) const
void getPermutation(bool reverse, size_t limit, Permutation & res) const
{
size_t s = data.size();
Permutation res(s);
res.resize(s);
for (size_t i = 0; i < s; ++i)
res[i] = i;
......@@ -168,8 +166,6 @@ public:
else
std::sort(res.begin(), res.end(), less(*this));
}
return res;
}
void reserve(size_t n)
......
......@@ -2,6 +2,8 @@
#include <Poco/SharedPtr.h>
#include <DB/Common/PODArray.h>
#include <DB/Core/Field.h>
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
......@@ -127,14 +129,14 @@ public:
/** Оставить только значения, соответствующие фильтру.
* Используется для операции WHERE / HAVING.
*/
typedef std::vector<UInt8> Filter;
typedef PODArray<UInt8> Filter;
virtual SharedPtr<IColumn> filter(const Filter & filt) const = 0;
/** Переставить значения местами, используя указанную перестановку.
* Используется при сортировке.
* limit - если не равно 0 - положить в результат только первые limit значений.
*/
typedef std::vector<size_t> Permutation;
typedef PODArray<size_t> Permutation;
virtual SharedPtr<IColumn> permute(const Permutation & perm, size_t limit) const = 0;
/** Сравнить (*this)[n] и rhs[m].
......@@ -157,13 +159,13 @@ public:
* reverse - обратный порядок (по возрастанию). limit - если не равно 0 - для частичной сортировки только первых значений.
* Независимо от порядка, NaN-ы располагаются в конце.
*/
virtual Permutation getPermutation(bool reverse, size_t limit) const = 0;
virtual void getPermutation(bool reverse, size_t limit, Permutation & res) const = 0;
/** Размножить все значения столько раз, сколько прописано в offsets.
* (i-е значение размножается в offsets[i] - offsets[i - 1] значений.)
*/
typedef UInt64 Offset_t;
typedef std::vector<Offset_t> Offsets_t;
typedef PODArray<Offset_t> Offsets_t;
virtual SharedPtr<IColumn> replicate(const Offsets_t & offsets) const = 0;
/** Посчитать минимум и максимум по столбцу.
......
......@@ -58,12 +58,11 @@ public:
return cloneDummy(limit ? std::min(s, limit) : s);
}
Permutation getPermutation(bool reverse, size_t limit) const
void getPermutation(bool reverse, size_t limit, Permutation & res) const
{
Permutation res(s);
res.resize(s);
for (size_t i = 0; i < s; ++i)
res[i] = i;
return res;
}
ColumnPtr replicate(const Offsets_t & offsets) const
......
......@@ -7,6 +7,7 @@
#include <memory>
#include <boost/noncopyable.hpp>
#include <boost/iterator_adaptors.hpp>
#include <Yandex/likely.h>
#include <Yandex/strong_typedef.h>
......@@ -22,9 +23,20 @@ namespace DB
* Предназначен для небольшого количества больших массивов (а не большого количества маленьких).
* А точнее - для использования в ColumnVector.
* Отличается от std::vector тем, что не инициализирует элементы.
* Сделано noncopyable, чтобы не было случайных копий. Скопировать данные можно с помощью метода assign.
*
* Сделан некопируемым, чтобы не было случайных копий. Скопировать данные можно с помощью метода assign.
*
* Поддерживается только часть интерфейса std::vector.
*
* Конструктор по-умолчанию создаёт пустой объект, который не выделяет память.
* Затем выделяется память минимум под initial_size элементов.
*
* При первом выделении памяти использует std::allocator.
* В реализации из libstdc++ он кэширует куски памяти несколько больше, чем обычный malloc.
*
* При изменении размера, использует realloc, который может (но не обязан) использовать mremap для больших кусков памяти.
* По факту, mremap используется при использовании аллокатора из glibc, но не используется, например, в tcmalloc.
*
* Если вставлять элементы push_back-ом, не делая reserve, то PODArray примерно в 2.5 раза быстрее std::vector.
*/
template <typename T>
......@@ -69,6 +81,12 @@ private:
void alloc(size_t n)
{
if (n == 0)
{
c_start = c_end = c_end_of_storage = NULL;
return;
}
size_t bytes_to_alloc = to_size(n);
c_start = c_end = Allocator::allocate(bytes_to_alloc);
c_end_of_storage = c_start + bytes_to_alloc;
......@@ -78,6 +96,9 @@ private:
void dealloc()
{
if (c_start == NULL)
return;
if (use_libc_realloc)
::free(c_start);
else
......@@ -87,6 +108,12 @@ private:
void realloc(size_t n)
{
// std::cerr << "realloc" << std::endl;
if (c_start == NULL)
{
alloc(n);
return;
}
ptrdiff_t end_diff = c_end - c_start;
size_t bytes_to_alloc = to_size(n);
......@@ -125,55 +152,26 @@ public:
/// Просто typedef нельзя, так как возникает неоднозначность для конструкторов и функций assign.
class iterator
struct iterator : public boost::iterator_adaptor<iterator, T*>
{
private:
T * ptr;
public:
iterator() {}
iterator(T * ptr_) : ptr(ptr_) {}
bool operator== (const iterator & rhs) const { return ptr == rhs.ptr; }
bool operator!= (const iterator & rhs) const { return ptr != rhs.ptr; }
iterator & operator++() { ++ptr; return *this; }
T & operator* () const { return *ptr; }
T * operator->() const { return ptr; }
iterator operator+ (ptrdiff_t diff) const { return ptr + diff; }
iterator operator- (ptrdiff_t diff) const { return ptr - diff; }
ptrdiff_t operator- (const iterator & rhs) const { return ptr - rhs.ptr; }
iterator(T * ptr_) : iterator::iterator_adaptor_(ptr_) {}
};
class const_iterator
struct const_iterator : public boost::iterator_adaptor<const_iterator, const T*>
{
private:
const T * ptr;
public:
const_iterator() {}
const_iterator(const T * ptr_) : ptr(ptr_) {}
bool operator== (const const_iterator & rhs) const { return ptr == rhs.ptr; }
bool operator!= (const const_iterator & rhs) const { return ptr != rhs.ptr; }
const_iterator & operator++() { ++ptr; return *this; }
const T & operator* () const { return *ptr; }
const T * operator->() const { return ptr; }
const_iterator operator+ (ptrdiff_t diff) const { return ptr + diff; }
const_iterator operator- (ptrdiff_t diff) const { return ptr - diff; }
ptrdiff_t operator- (const const_iterator & rhs) const { return ptr - rhs.ptr; }
const_iterator(const T * ptr_) : const_iterator::iterator_adaptor_(ptr_) {}
};
PODArray() : use_libc_realloc(false) { alloc(initial_size); }
PODArray() : use_libc_realloc(false) { alloc(0); }
PODArray(size_t n) : use_libc_realloc(false) { alloc(n); c_end += byte_size(n); }
PODArray(size_t n, const T & x) : use_libc_realloc(false) { alloc(n); assign(n, x); }
PODArray(const_iterator from_begin, const_iterator from_end) : use_libc_realloc(false) { alloc(from_end - from_begin); insert(from_begin, from_end); }
~PODArray() { dealloc(); }
size_t size() const { return t_end() - t_start(); }
bool empty() const { return t_end() == t_start(); }
size_t capacity() const { return t_end_of_storage() - t_start(); }
......@@ -199,7 +197,10 @@ public:
void reserve()
{
realloc(size() * 2);
if (size() == 0)
realloc(initial_size);
else
realloc(size() * 2);
}
void resize(size_t n)
......@@ -235,7 +236,8 @@ public:
}
/// Не вставляйте в массив кусок самого себя. Потому что при ресайзе, итераторы на самого себя могут инвалидироваться.
void insert(const_iterator from_begin, const_iterator from_end)
template <typename It1, typename It2>
void insert(It1 from_begin, It2 from_end)
{
size_t required_capacity = size() + (from_end - from_begin);
if (required_capacity > capacity())
......@@ -259,14 +261,15 @@ public:
std::fill(begin(), end(), x);
}
void assign(const_iterator from_begin, const_iterator from_end)
template <typename It1, typename It2>
void assign(It1 from_begin, It2 from_end)
{
size_t required_capacity = from_end - from_begin;
if (required_capacity > capacity())
reserve(round_up_to_power_of_two(required_capacity));
size_t bytes_to_copy = byte_size(required_capacity);
memcpy(c_start, reinterpret_cast<const void *>(from_begin), bytes_to_copy);
memcpy(c_start, reinterpret_cast<const void *>(&*from_begin), bytes_to_copy);
c_end = c_start + bytes_to_copy;
}
......@@ -274,6 +277,32 @@ public:
{
assign(from.begin(), from.end());
}
bool operator== (const PODArray<T> & other) const
{
if (size() != other.size())
return false;
const_iterator this_it = begin();
const_iterator that_it = other.begin();
while (this_it != end())
{
if (*this_it != *that_it)
return false;
++this_it;
++that_it;
}
return true;
}
bool operator!= (const PODArray<T> & other) const
{
return !operator==(other);
}
};
template <typename T>
......
......@@ -18,21 +18,21 @@ struct BinaryOperationImpl
{
typedef typename Op::ResultType ResultType;
static void vector_vector(const std::vector<A> & a, const std::vector<B> & b, std::vector<ResultType> & c)
static void vector_vector(const PODArray<A> & a, const PODArray<B> & b, PODArray<ResultType> & c)
{
size_t size = a.size();
for (size_t i = 0; i < size; ++i)
c[i] = Op::apply(a[i], b[i]);
}
static void vector_constant(const std::vector<A> & a, B b, std::vector<ResultType> & c)
static void vector_constant(const PODArray<A> & a, B b, PODArray<ResultType> & c)
{
size_t size = a.size();
for (size_t i = 0; i < size; ++i)
c[i] = Op::apply(a[i], b);
}
static void constant_vector(A a, const std::vector<B> & b, std::vector<ResultType> & c)
static void constant_vector(A a, const PODArray<B> & b, PODArray<ResultType> & c)
{
size_t size = b.size();
for (size_t i = 0; i < size; ++i)
......@@ -50,7 +50,7 @@ struct UnaryOperationImpl
{
typedef typename Op::ResultType ResultType;
static void vector(const std::vector<A> & a, std::vector<ResultType> & c)
static void vector(const PODArray<A> & a, PODArray<ResultType> & c)
{
size_t size = a.size();
for (size_t i = 0; i < size; ++i)
......
......@@ -72,9 +72,9 @@ template <typename T>
struct ArrayElementNumImpl
{
static void vector(
const std::vector<T> & data, const ColumnArray::Offsets_t & offsets,
const PODArray<T> & data, const ColumnArray::Offsets_t & offsets,
const ColumnArray::Offset_t index, /// Передаётся индекс начиная с нуля, а не с единицы.
std::vector<T> & result)
PODArray<T> & result)
{
size_t size = offsets.size();
result.resize(size);
......@@ -86,7 +86,8 @@ struct ArrayElementNumImpl
if (index < array_size)
result[i] = data[current_offset + index];
/// Иначе - ничего не делаем (оставим значение по-умолчанию, которое уже лежит в векторе).
else
result[i] = T();
current_offset = offsets[i];
}
......@@ -287,9 +288,9 @@ template <typename T, typename IndexConv>
struct ArrayIndexNumImpl
{
static void vector(
const std::vector<T> & data, const ColumnArray::Offsets_t & offsets,
const PODArray<T> & data, const ColumnArray::Offsets_t & offsets,
const T value,
std::vector<typename IndexConv::ResultType> & result)
PODArray<typename IndexConv::ResultType> & result)
{
size_t size = offsets.size();
result.resize(size);
......@@ -321,7 +322,7 @@ struct ArrayIndexStringImpl
static void vector(
const ColumnString::Chars_t & data, const ColumnArray::Offsets_t & offsets, const ColumnString::Offsets_t & string_offsets,
const String & value,
std::vector<typename IndexConv::ResultType> & result)
PODArray<typename IndexConv::ResultType> & result)
{
size_t size = offsets.size();
size_t value_size = value.size();
......
......@@ -22,7 +22,7 @@ template <typename A, typename B, typename ResultType>
struct NumIfImpl
{
private:
static std::vector<ResultType>& result_vector(Block & block, size_t result, size_t size)
static PODArray<ResultType> & result_vector(Block & block, size_t result, size_t size)
{
ColumnVector<ResultType> * col_res = new ColumnVector<ResultType>;
block.getByPosition(result).column = col_res;
......@@ -34,49 +34,49 @@ private:
}
public:
static void vector_vector(
const std::vector<UInt8> & cond,
const std::vector<A> & a, const std::vector<B> & b,
const PODArray<UInt8> & cond,
const PODArray<A> & a, const PODArray<B> & b,
Block & block,
size_t result)
{
size_t size = cond.size();
std::vector<ResultType> & res = result_vector(block, result, size);
PODArray<ResultType> & res = result_vector(block, result, size);
for (size_t i = 0; i < size; ++i)
res[i] = cond[i] ? static_cast<ResultType>(a[i]) : static_cast<ResultType>(b[i]);
}
static void vector_constant(
const std::vector<UInt8> & cond,
const std::vector<A> & a, B b,
const PODArray<UInt8> & cond,
const PODArray<A> & a, B b,
Block & block,
size_t result)
{
size_t size = cond.size();
std::vector<ResultType> & res = result_vector(block, result, size);
PODArray<ResultType> & res = result_vector(block, result, size);
for (size_t i = 0; i < size; ++i)
res[i] = cond[i] ? static_cast<ResultType>(a[i]) : static_cast<ResultType>(b);
}
static void constant_vector(
const std::vector<UInt8> & cond,
A a, const std::vector<B> & b,
const PODArray<UInt8> & cond,
A a, const PODArray<B> & b,
Block & block,
size_t result)
{
size_t size = cond.size();
std::vector<ResultType> & res = result_vector(block, result, size);
PODArray<ResultType> & res = result_vector(block, result, size);
for (size_t i = 0; i < size; ++i)
res[i] = cond[i] ? static_cast<ResultType>(a) : static_cast<ResultType>(b[i]);
}
static void constant_constant(
const std::vector<UInt8> & cond,
const PODArray<UInt8> & cond,
A a, B b,
Block & block,
size_t result)
{
size_t size = cond.size();
std::vector<ResultType> & res = result_vector(block, result, size);
PODArray<ResultType> & res = result_vector(block, result, size);
for (size_t i = 0; i < size; ++i)
res[i] = cond[i] ? static_cast<ResultType>(a) : static_cast<ResultType>(b);
}
......@@ -92,8 +92,8 @@ private:
}
public:
static void vector_vector(
const std::vector<UInt8> & cond,
const std::vector<A> & a, const std::vector<B> & b,
const PODArray<UInt8> & cond,
const PODArray<A> & a, const PODArray<B> & b,
Block & block,
size_t result)
{
......@@ -101,8 +101,8 @@ public:
}
static void vector_constant(
const std::vector<UInt8> & cond,
const std::vector<A> & a, B b,
const PODArray<UInt8> & cond,
const PODArray<A> & a, B b,
Block & block,
size_t result)
{
......@@ -110,8 +110,8 @@ public:
}
static void constant_vector(
const std::vector<UInt8> & cond,
A a, const std::vector<B> & b,
const PODArray<UInt8> & cond,
A a, const PODArray<B> & b,
Block & block,
size_t result)
{
......@@ -119,7 +119,7 @@ public:
}
static void constant_constant(
const std::vector<UInt8> & cond,
const PODArray<UInt8> & cond,
A a, B b,
Block & block,
size_t result)
......@@ -132,7 +132,7 @@ public:
struct StringIfImpl
{
static void vector_vector(
const std::vector<UInt8> & cond,
const PODArray<UInt8> & cond,
const ColumnString::Chars_t & a_data, const ColumnString::Offsets_t & a_offsets,
const ColumnString::Chars_t & b_data, const ColumnString::Offsets_t & b_offsets,
ColumnString::Chars_t & c_data, ColumnString::Offsets_t & c_offsets)
......@@ -170,7 +170,7 @@ struct StringIfImpl
}
static void vector_constant(
const std::vector<UInt8> & cond,
const PODArray<UInt8> & cond,
const ColumnString::Chars_t & a_data, const ColumnString::Offsets_t & a_offsets,
const String & b,
ColumnString::Chars_t & c_data, ColumnString::Offsets_t & c_offsets)
......@@ -206,7 +206,7 @@ struct StringIfImpl
}
static void constant_vector(
const std::vector<UInt8> & cond,
const PODArray<UInt8> & cond,
const String & a,
const ColumnString::Chars_t & b_data, const ColumnString::Offsets_t & b_offsets,
ColumnString::Chars_t & c_data, ColumnString::Offsets_t & c_offsets)
......@@ -242,7 +242,7 @@ struct StringIfImpl
}
static void constant_constant(
const std::vector<UInt8> & cond,
const PODArray<UInt8> & cond,
const String & a, const String & b,
ColumnString::Chars_t & c_data, ColumnString::Offsets_t & c_offsets)
{
......
......@@ -302,8 +302,8 @@ template <typename DurationType>
struct TimeSlotsImpl
{
static void vector_vector(
const std::vector<UInt32> & starts, const std::vector<DurationType> & durations,
std::vector<UInt32> & result_values, ColumnArray::Offsets_t & result_offsets)
const PODArray<UInt32> & starts, const PODArray<DurationType> & durations,
PODArray<UInt32> & result_values, ColumnArray::Offsets_t & result_offsets)
{
size_t size = starts.size();
......@@ -324,8 +324,8 @@ struct TimeSlotsImpl
}
static void vector_constant(
const std::vector<UInt32> & starts, DurationType duration,
std::vector<UInt32> & result_values, ColumnArray::Offsets_t & result_offsets)
const PODArray<UInt32> & starts, DurationType duration,
PODArray<UInt32> & result_values, ColumnArray::Offsets_t & result_offsets)
{
size_t size = starts.size();
......@@ -346,8 +346,8 @@ struct TimeSlotsImpl
}
static void constant_vector(
UInt32 start, const std::vector<DurationType> & durations,
std::vector<UInt32> & result_values, ColumnArray::Offsets_t & result_offsets)
UInt32 start, const PODArray<DurationType> & durations,
PODArray<UInt32> & result_values, ColumnArray::Offsets_t & result_offsets)
{
size_t size = durations.size();
......
......@@ -14,21 +14,21 @@ namespace DB
template<typename A, typename B>
struct AndImpl
{
static void vector_vector(const std::vector<A> & a, const std::vector<B> & b, std::vector<UInt8> & c)
static void vector_vector(const PODArray<A> & a, const PODArray<B> & b, PODArray<UInt8> & c)
{
size_t size = a.size();
for (size_t i = 0; i < size; ++i)
c[i] = a[i] && b[i];
}
static void vector_constant(const std::vector<A> & a, B b, std::vector<UInt8> & c)
static void vector_constant(const PODArray<A> & a, B b, PODArray<UInt8> & c)
{
size_t size = a.size();
for (size_t i = 0; i < size; ++i)
c[i] = a[i] && b;
}
static void constant_vector(A a, const std::vector<B> & b, std::vector<UInt8> & c)
static void constant_vector(A a, const PODArray<B> & b, PODArray<UInt8> & c)
{
size_t size = b.size();
for (size_t i = 0; i < size; ++i)
......@@ -44,21 +44,21 @@ struct AndImpl
template<typename A, typename B>
struct OrImpl
{
static void vector_vector(const std::vector<A> & a, const std::vector<B> & b, std::vector<UInt8> & c)
static void vector_vector(const PODArray<A> & a, const PODArray<B> & b, PODArray<UInt8> & c)
{
size_t size = a.size();
for (size_t i = 0; i < size; ++i)
c[i] = a[i] || b[i];
}
static void vector_constant(const std::vector<A> & a, B b, std::vector<UInt8> & c)
static void vector_constant(const PODArray<A> & a, B b, PODArray<UInt8> & c)
{
size_t size = a.size();
for (size_t i = 0; i < size; ++i)
c[i] = a[i] || b;
}
static void constant_vector(A a, const std::vector<B> & b, std::vector<UInt8> & c)
static void constant_vector(A a, const PODArray<B> & b, PODArray<UInt8> & c)
{
size_t size = b.size();
for (size_t i = 0; i < size; ++i)
......@@ -74,21 +74,21 @@ struct OrImpl
template<typename A, typename B>
struct XorImpl
{
static void vector_vector(const std::vector<A> & a, const std::vector<B> & b, std::vector<UInt8> & c)
static void vector_vector(const PODArray<A> & a, const PODArray<B> & b, PODArray<UInt8> & c)
{
size_t size = a.size();
for (size_t i = 0; i < size; ++i)
c[i] = (a[i] && !b[i]) || (!a[i] && b[i]);
}
static void vector_constant(const std::vector<A> & a, B b, std::vector<UInt8> & c)
static void vector_constant(const PODArray<A> & a, B b, PODArray<UInt8> & c)
{
size_t size = a.size();
for (size_t i = 0; i < size; ++i)
c[i] = (a[i] && !b) || (!a[i] && b);
}
static void constant_vector(A a, const std::vector<B> & b, std::vector<UInt8> & c)
static void constant_vector(A a, const PODArray<B> & b, PODArray<UInt8> & c)
{
size_t size = b.size();
for (size_t i = 0; i < size; ++i)
......@@ -104,7 +104,7 @@ struct XorImpl
template<typename A>
struct NotImpl
{
static void vector(const std::vector<A> & a, std::vector<UInt8> & c)
static void vector(const PODArray<A> & a, PODArray<UInt8> & c)
{
size_t size = a.size();
for (size_t i = 0; i < size; ++i)
......
......@@ -43,7 +43,7 @@ struct RandImpl
{
typedef UInt32 ReturnType;
static void execute(std::vector<ReturnType> & res)
static void execute(PODArray<ReturnType> & res)
{
drand48_data rand_state;
detail::seed(rand_state);
......@@ -62,7 +62,7 @@ struct Rand64Impl
{
typedef UInt64 ReturnType;
static void execute(std::vector<ReturnType> & res)
static void execute(PODArray<ReturnType> & res)
{
drand48_data rand_state;
detail::seed(rand_state);
......
......@@ -46,7 +46,7 @@ template <bool negative = false>
struct EmptyImpl
{
static void vector(const ColumnString::Chars_t & data, const ColumnString::Offsets_t & offsets,
std::vector<UInt8> & res)
PODArray<UInt8> & res)
{
size_t size = offsets.size();
ColumnString::Offset_t prev_offset = 1;
......@@ -64,7 +64,7 @@ struct EmptyImpl
}
static void vector_fixed_to_vector(const ColumnString::Chars_t & data, size_t n,
std::vector<UInt8> & res)
PODArray<UInt8> & res)
{
}
......@@ -73,7 +73,7 @@ struct EmptyImpl
res = negative ^ (data.empty());
}
static void array(const ColumnString::Offsets_t & offsets, std::vector<UInt8> & res)
static void array(const ColumnString::Offsets_t & offsets, PODArray<UInt8> & res)
{
size_t size = offsets.size();
ColumnString::Offset_t prev_offset = 0;
......@@ -96,7 +96,7 @@ struct EmptyImpl
struct LengthImpl
{
static void vector(const ColumnString::Chars_t & data, const ColumnString::Offsets_t & offsets,
std::vector<UInt64> & res)
PODArray<UInt64> & res)
{
size_t size = offsets.size();
for (size_t i = 0; i < size; ++i)
......@@ -112,7 +112,7 @@ struct LengthImpl
}
static void vector_fixed_to_vector(const ColumnString::Chars_t & data, size_t n,
std::vector<UInt64> & res)
PODArray<UInt64> & res)
{
}
......@@ -121,7 +121,7 @@ struct LengthImpl
res = data.size();
}
static void array(const ColumnString::Offsets_t & offsets, std::vector<UInt64> & res)
static void array(const ColumnString::Offsets_t & offsets, PODArray<UInt64> & res)
{
size_t size = offsets.size();
for (size_t i = 0; i < size; ++i)
......@@ -144,7 +144,7 @@ struct LengthImpl
struct LengthUTF8Impl
{
static void vector(const ColumnString::Chars_t & data, const ColumnString::Offsets_t & offsets,
std::vector<UInt64> & res)
PODArray<UInt64> & res)
{
size_t size = offsets.size();
......@@ -165,7 +165,7 @@ struct LengthUTF8Impl
}
static void vector_fixed_to_vector(const ColumnString::Chars_t & data, size_t n,
std::vector<UInt64> & res)
PODArray<UInt64> & res)
{
size_t size = data.size() / n;
......@@ -186,7 +186,7 @@ struct LengthUTF8Impl
++res;
}
static void array(const ColumnString::Offsets_t & offsets, std::vector<UInt64> & res)
static void array(const ColumnString::Offsets_t & offsets, PODArray<UInt64> & res)
{
throw Exception("Cannot apply function lengthUTF8 to Array argument", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
......@@ -207,7 +207,7 @@ struct LowerUpperImpl
ColumnString::Chars_t & res_data, ColumnString::Offsets_t & res_offsets)
{
res_data.resize(data.size());
res_offsets = offsets;
res_offsets.assign(offsets);
array(&*data.begin(), &*data.end(), &*res_data.begin());
}
......@@ -246,7 +246,7 @@ struct LowerUpperUTF8Impl
ColumnString::Chars_t & res_data, ColumnString::Offsets_t & res_offsets)
{
res_data.resize(data.size());
res_offsets = offsets;
res_offsets.assign(offsets);
array(&*data.begin(), &*data.end(), &*res_data.begin());
}
......@@ -295,7 +295,7 @@ struct ReverseImpl
ColumnString::Chars_t & res_data, ColumnString::Offsets_t & res_offsets)
{
res_data.resize(data.size());
res_offsets = offsets;
res_offsets.assign(offsets);
size_t size = offsets.size();
ColumnString::Offset_t prev_offset = 0;
......@@ -338,7 +338,7 @@ struct ReverseUTF8Impl
ColumnString::Chars_t & res_data, ColumnString::Offsets_t & res_offsets)
{
res_data.resize(data.size());
res_offsets = offsets;
res_offsets.assign(offsets);
size_t size = offsets.size();
ColumnString::Offset_t prev_offset = 0;
......@@ -477,7 +477,7 @@ struct ConcatImpl
{
size_t size = a_offsets.size();
c_data.resize(a_data.size() + b.size() * size);
c_offsets = a_offsets;
c_offsets.assign(a_offsets);
for (size_t i = 0; i < size; ++i)
c_offsets[i] += b.size() * (i + 1);
......@@ -575,7 +575,7 @@ struct ConcatImpl
{
size_t size = b_offsets.size();
c_data.resize(b_data.size() + a.size() * size);
c_offsets = b_offsets;
c_offsets.assign(b_offsets);
for (size_t i = 0; i < size; ++i)
c_offsets[i] += a.size() * (i + 1);
......
......@@ -45,7 +45,7 @@ struct PositionImpl
/// Предполагается, что res нужного размера и инициализирован нулями.
static void vector(const ColumnString::Chars_t & data, const ColumnString::Offsets_t & offsets,
const std::string & needle,
std::vector<UInt64> & res)
PODArray<UInt64> & res)
{
const UInt8 * begin = &data[0];
const UInt8 * pos = begin;
......@@ -61,15 +61,22 @@ struct PositionImpl
{
/// Определим, к какому индексу оно относится.
while (begin + offsets[i] < pos)
{
res[i] = 0;
++i;
}
/// Проверяем, что вхождение не переходит через границы строк.
if (pos + needle.size() < begin + offsets[i])
res[i] = (i != 0) ? pos - begin - offsets[i - 1] + 1 : (pos - begin + 1);
else
res[i] = 0;
pos = begin + offsets[i];
++i;
}
memset(&res[i], 0, (res.size() - i) * sizeof(res[0]));
}
static void constant(const std::string & data, const std::string & needle, UInt64 & res)
......@@ -89,7 +96,7 @@ struct PositionUTF8Impl
static void vector(const ColumnString::Chars_t & data, const ColumnString::Offsets_t & offsets,
const std::string & needle,
std::vector<UInt64> & res)
PODArray<UInt64> & res)
{
const UInt8 * begin = &data[0];
const UInt8 * pos = begin;
......@@ -105,7 +112,10 @@ struct PositionUTF8Impl
{
/// Определим, к какому индексу оно относится.
while (begin + offsets[i] < pos)
{
res[i] = 0;
++i;
}
/// Проверяем, что вхождение не переходит через границы строк.
if (pos + needle.size() < begin + offsets[i])
......@@ -116,10 +126,14 @@ struct PositionUTF8Impl
if (*c <= 0x7F || *c >= 0xC0)
++res[i];
}
else
res[i] = 0;
pos = begin + offsets[i];
++i;
}
memset(&res[i], 0, (res.size() - i) * sizeof(res[0]));
}
static void constant(const std::string & data, const std::string & needle, UInt64 & res)
......@@ -268,16 +282,12 @@ struct MatchImpl
static void vector(const ColumnString::Chars_t & data, const ColumnString::Offsets_t & offsets,
const std::string & pattern,
std::vector<UInt8> & res)
PODArray<UInt8> & res)
{
String strstr_pattern;
/// Простой случай, когда выражение LIKE сводится к поиску подстроки в строке
if (like && likePatternIsStrstr(pattern, strstr_pattern))
{
/// Если отрицание - то заполним вектор единицами (вместо имеющихся там нулей)
if (revert)
memset(&res[0], 1, offsets.size());
const UInt8 * begin = &data[0];
const UInt8 * pos = begin;
const UInt8 * end = pos + data.size();
......@@ -292,15 +302,22 @@ struct MatchImpl
{
/// Определим, к какому индексу оно относится.
while (begin + offsets[i] < pos)
{
res[i] = revert;
++i;
}
/// Проверяем, что вхождение не переходит через границы строк.
if (pos + strstr_pattern.size() < begin + offsets[i])
res[i] = !revert;
else
res[i] = revert;
pos = begin + offsets[i];
++i;
}
memset(&res[i], revert, (res.size() - i) * sizeof(res[0]));
}
else
{
......
......@@ -288,7 +288,7 @@ struct ExtractParamImpl
/// Предполагается, что res нужного размера и инициализирован нулями.
static void vector(const ColumnString::Chars_t & data, const ColumnString::Offsets_t & offsets,
std::string needle,
std::vector<ResultType> & res)
PODArray<ResultType> & res)
{
/// Ищем параметр просто как подстроку вида "name":
needle = "\"" + needle + "\":";
......@@ -307,15 +307,22 @@ struct ExtractParamImpl
{
/// Определим, к какому индексу оно относится.
while (begin + offsets[i] < pos)
{
res[i] = 0;
++i;
}
/// Проверяем, что вхождение не переходит через границы строк.
if (pos + needle.size() < begin + offsets[i])
res[i] = ParamExtractor::extract(pos + needle.size(), begin + offsets[i]);
else
res[i] = 0;
pos = begin + offsets[i];
++i;
}
memset(&res[i], 0, (res.size() - i) * sizeof(res[0]));
}
static void constant(const std::string & data, std::string needle, ResultType & res)
......
......@@ -9,7 +9,7 @@ namespace DB
template <typename T>
static void numWidthVector(const std::vector<T> & a, std::vector<UInt64> & c)
static void numWidthVector(const PODArray<T> & a, PODArray<UInt64> & c)
{
size_t size = a.size();
for (size_t i = 0; i < size; ++i)
......@@ -46,7 +46,7 @@ inline UInt64 floatWidth(double x)
}
template <typename T>
static void floatWidthVector(const std::vector<T> & a, std::vector<UInt64> & c)
static void floatWidthVector(const PODArray<T> & a, PODArray<UInt64> & c)
{
size_t size = a.size();
for (size_t i = 0; i < size; ++i)
......@@ -59,13 +59,13 @@ static void floatWidthConstant(T a, UInt64 & c)
c = floatWidth(a);
}
template <> inline void numWidthVector<Float64>(const std::vector<Float64> & a, std::vector<UInt64> & c) { floatWidthVector(a, c); }
template <> inline void numWidthVector<Float32>(const std::vector<Float32> & a, std::vector<UInt64> & c) { floatWidthVector(a, c); }
template <> inline void numWidthVector<Float64>(const PODArray<Float64> & a, PODArray<UInt64> & c) { floatWidthVector(a, c); }
template <> inline void numWidthVector<Float32>(const PODArray<Float32> & a, PODArray<UInt64> & c) { floatWidthVector(a, c); }
template <> inline void numWidthConstant<Float64>(Float64 a, UInt64 & c) { floatWidthConstant(a, c); }
template <> inline void numWidthConstant<Float32>(Float32 a, UInt64 & c) { floatWidthConstant(a, c); }
static inline void stringWidthVector(const ColumnString::Chars_t & data, const ColumnString::Offsets_t & offsets, std::vector<UInt64> & res)
static inline void stringWidthVector(const ColumnString::Chars_t & data, const ColumnString::Offsets_t & offsets, PODArray<UInt64> & res)
{
size_t size = offsets.size();
......@@ -77,7 +77,7 @@ static inline void stringWidthVector(const ColumnString::Chars_t & data, const C
}
}
static inline void stringWidthFixedVector(const ColumnString::Chars_t & data, size_t n, std::vector<UInt64> & res)
static inline void stringWidthFixedVector(const ColumnString::Chars_t & data, size_t n, PODArray<UInt64> & res)
{
size_t size = data.size() / n;
for (size_t i = 0; i < size; ++i)
......
......@@ -81,10 +81,10 @@ void sortBlock(Block & block, const SortDescription & description, size_t limit)
if (needCollation(column, description[0]))
{
const ColumnString & column_string = dynamic_cast<const ColumnString &>(*column);
perm = column_string.getPermutationWithCollation(*description[0].collator, reverse, limit);
column_string.getPermutationWithCollation(*description[0].collator, reverse, limit, perm);
}
else
perm = column->getPermutation(reverse, limit);
column->getPermutation(reverse, limit, perm);
size_t columns = block.columns();
for (size_t i = 0; i < columns; ++i)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册