提交 8f541258 编写于 作者: A Alexey Milovidov

Minor modifications #2808

上级 6d509251
......@@ -124,27 +124,24 @@ public:
return res;
}
/// Get peice of memory with alignment
char * alignedAlloc(size_t size, size_t align)
/// Get peice of memory with alignment
char * alignedAlloc(size_t size, size_t alignment)
{
// Fast code path for non-alignment requirement
if (align <= 1)
return alloc(size);
uintptr_t pos = reinterpret_cast<uintptr_t>(head->pos);
// next pos match align requirement
pos = ((pos-1) & ~(align - 1)) + align;
if (unlikely(pos + size > reinterpret_cast<uintptr_t>(head->end)))
do
{
addChunk(size);
pos = reinterpret_cast<uintptr_t>(head->pos);
pos = ((pos-1) & ~(align - 1)) + align;
}
char * res = (char *)pos;
head->pos = res + size;
return res;
void * head_pos = head->pos;
size_t space = head->end - head->pos;
auto res = static_cast<char *>(std::align(alignment, size, head_pos, space));
if (res)
{
head->pos = static_cast<char *>(head_pos);
head->pos += size;
return res;
}
addChunk(size + alignment);
} while (true);
}
/** Rollback just performed allocation.
......
......@@ -48,6 +48,7 @@ namespace ErrorCodes
extern const int TOO_MANY_ROWS;
extern const int EMPTY_DATA_PASSED;
extern const int CANNOT_MERGE_DIFFERENT_AGGREGATED_DATA_VARIANTS;
extern const int LOGICAL_ERROR;
}
......@@ -163,23 +164,20 @@ Aggregator::Aggregator(const Params & params_)
offsets_of_aggregate_states[i] = total_size_of_aggregate_states;
total_size_of_aggregate_states += params.aggregates[i].function->sizeOfData();
// aggreate states are aligned based on maximum requirement
align_aggregate_states = std::max(align_aggregate_states,
params.aggregates[i].function->alignOfData());
align_aggregate_states = std::max(align_aggregate_states, params.aggregates[i].function->alignOfData());
// If not the last aggregate_state, we need pad it so that next aggregate_state will be
// aligned.
// If not the last aggregate_state, we need pad it so that next aggregate_state will be aligned.
if (i + 1 < params.aggregates_size)
{
size_t next_align_req = params.aggregates[i+1].function->alignOfData();
if ((next_align_req & (next_align_req -1)) != 0)
{
throw Exception("alignOfData is not 2^N");
}
// extend total_size to next alignment requirement
total_size_of_aggregate_states =
(total_size_of_aggregate_states & ~(next_align_req - 1)) + next_align_req;
size_t alignment_of_next_state = params.aggregates[i + 1].function->alignOfData();
if ((alignment_of_next_state & (alignment_of_next_state - 1)) != 0)
throw Exception("Logical error: alignOfData is not 2^N", ErrorCodes::LOGICAL_ERROR);
/// Extend total_size to next alignment requirement
/// Add padding by rounding up 'total_size_of_aggregate_states' to be a multiplier of alignment_of_next_state.
total_size_of_aggregate_states = (total_size_of_aggregate_states + alignment_of_next_state - 1) / alignment_of_next_state * alignment_of_next_state;
}
if (!params.aggregates[i].function->hasTrivialDestructor())
......@@ -637,8 +635,7 @@ void NO_INLINE Aggregator::executeImplCase(
method.onNewKey(*it, params.keys_size, keys, *aggregates_pool);
AggregateDataPtr place = aggregates_pool->alignedAlloc(total_size_of_aggregate_states,
align_aggregate_states);
AggregateDataPtr place = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
createAggregateStates(place);
aggregate_data = place;
}
......@@ -756,8 +753,7 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re
if ((params.overflow_row || result.type == AggregatedDataVariants::Type::without_key) && !result.without_key)
{
AggregateDataPtr place = result.aggregates_pool->alignedAlloc(total_size_of_aggregate_states,
align_aggregate_states);
AggregateDataPtr place = result.aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
createAggregateStates(place);
result.without_key = place;
}
......@@ -1925,8 +1921,7 @@ void NO_INLINE Aggregator::mergeStreamsImplCase(
method.onNewKey(*it, params.keys_size, keys, *aggregates_pool);
AggregateDataPtr place = aggregates_pool->alignedAlloc(total_size_of_aggregate_states,
align_aggregate_states);
AggregateDataPtr place = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
createAggregateStates(place);
aggregate_data = place;
}
......@@ -1977,8 +1972,7 @@ void NO_INLINE Aggregator::mergeWithoutKeyStreamsImpl(
AggregatedDataWithoutKey & res = result.without_key;
if (!res)
{
AggregateDataPtr place = result.aggregates_pool->alignedAlloc(total_size_of_aggregate_states,
align_aggregate_states);
AggregateDataPtr place = result.aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
createAggregateStates(place);
res = place;
}
......
......@@ -1155,8 +1155,8 @@ protected:
Sizes offsets_of_aggregate_states; /// The offset to the n-th aggregate function in a row of aggregate functions.
size_t total_size_of_aggregate_states = 0; /// The total size of the row from the aggregate functions.
// add info to track aligned requirement
// If there are states whose alignment info were v1, ..vn, align_aggregate_states will be max(v1, ... vn)
// add info to track alignment requirement
// If there are states whose alignmentment are v1, ..vn, align_aggregate_states will be max(v1, ... vn)
size_t align_aggregate_states = 1;
bool all_aggregates_has_trivial_destructor = false;
......
......@@ -191,8 +191,7 @@ void NO_INLINE Aggregator::executeSpecializedCase(
method.onNewKey(*it, params.keys_size, keys, *aggregates_pool);
AggregateDataPtr place = aggregates_pool->alignedAlloc(total_size_of_aggregate_states,
align_aggregate_states);
AggregateDataPtr place = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
AggregateFunctionsList::forEach(AggregateFunctionsCreator(
aggregate_functions, offsets_of_aggregate_states, place));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册