提交 6b25396e 编写于 作者: B Bhuvnesh Chaudhary 提交者: Bhuvnesh

Fix spilling of aggstate with compression

Earlier while creating the workfile for hash aggregates, the data
was written to file for each agg state, and after all the aggstate's
data has been written, the total size was updated in the file. For
updating the total size, it used to SEEK backwards to the offset where
total size was written previously.

It used to work for workfiles without compression. However, when
gp_workfile_compression=on, the workfiles are compressed, an attempt to
SEEK the earlier offset will error out, as for compressed file its
not expected to go back.

This commit fixes the issue by writing all the data to a buffer, so that
the total size is known, and after that its written to the file.
Co-Authored-By: NAshwin Agrawal <aagrawal@pivotal.io>
Signed-off-by: NAshwin Agrawal <aagrawal@pivotal.io>
上级 9de3196f
......@@ -1548,7 +1548,6 @@ writeHashEntry(AggState *aggstate, BatchFileInfo *file_info,
AggStatePerGroup pergroup;
int aggno;
AggStatePerAgg peragg = aggstate->peragg;
int32 aggstateSize = 0;
Size datum_size;
Datum serializedVal;
int16 byteaTranstypeLen = 0;
......@@ -1559,27 +1558,34 @@ writeHashEntry(AggState *aggstate, BatchFileInfo *file_info,
Assert(file_info != NULL);
Assert(file_info->wfile != NULL);
BufFileWriteOrError(file_info->wfile, (void *) &entry->hashvalue, sizeof(entry->hashvalue));
/*
* Store all the data into a buffer then write it at one shot to the file. The
* spill files could be compressed, and we can't go back and update the total size
* once data is written, so better to get all the data and write it.
* aggDataBuffer is a static pointer because we want to initialize it only once
* in TopMemoryContext to avoid the overhead for allocation and freeing for every
* call of this function.
*
* We initially start the buffer with the 1024, and keep incrementing it
* with 1024 whenever the buffer + datum_size exceeds the current buffer size
*/
static char *aggDataBuffer = NULL;
const int bufferIncrementSize = 1024;
static int aggDataBufferSize = bufferIncrementSize;
int32 aggDataOffset = 0;
if (aggDataBuffer == NULL)
aggDataBuffer = MemoryContextAlloc(TopMemoryContext, aggDataBufferSize);
tuple_agg_size = memtuple_get_size((MemTuple)entry->tuple_and_aggs);
pergroup = (AggStatePerGroup) ((char *)entry->tuple_and_aggs + MAXALIGN(tuple_agg_size));
tuple_agg_size = MAXALIGN(tuple_agg_size) +
aggstate->numaggs * sizeof(AggStatePerGroupData);
total_size = MAXALIGN(tuple_agg_size);
BufFileWriteOrError(file_info->wfile, (char *) &total_size, sizeof(total_size));
BufFileWriteOrError(file_info->wfile, entry->tuple_and_aggs, tuple_agg_size);
Assert(MAXALIGN(tuple_agg_size) - tuple_agg_size <= MAXIMUM_ALIGNOF);
if (MAXALIGN(tuple_agg_size) - tuple_agg_size > 0)
{
BufFileWriteOrError(file_info->wfile, padding_dummy, MAXALIGN(tuple_agg_size) - tuple_agg_size);
}
aggstate->numaggs * sizeof(AggStatePerGroupData);
/* Write the transition aggstates */
for (aggno = 0; aggno < aggstate->numaggs; aggno++)
{
AggStatePerAgg peraggstate = &peragg[aggno];
AggStatePerGroup pergroupstate = &pergroup[aggno];
char *datum_value = NULL;
/* Skip null transValue */
if (pergroupstate->transValueIsNull)
......@@ -1605,14 +1611,9 @@ writeHashEntry(AggState *aggstate, BatchFileInfo *file_info,
/* Not necessary to do this if the serialization func has no memory leak */
old_ctx = MemoryContextSwitchTo(aggstate->hhashtable->serialization_cxt);
serializedVal = FunctionCallInvoke(&fcinfo);
datum_size = datumGetSize(serializedVal, byteaTranstypeByVal, byteaTranstypeLen);
BufFileWriteOrError(file_info->wfile,
DatumGetPointer(serializedVal), datum_size);
pfree(DatumGetPointer(serializedVal));
datum_value = DatumGetPointer(serializedVal);
MemoryContextSwitchTo(old_ctx);
}
/* If it's a ByRef, write the data to the file */
......@@ -1621,8 +1622,7 @@ writeHashEntry(AggState *aggstate, BatchFileInfo *file_info,
datum_size = datumGetSize(pergroupstate->transValue,
peraggstate->transtypeByVal,
peraggstate->transtypeLen);
BufFileWriteOrError(file_info->wfile,
DatumGetPointer(pergroupstate->transValue), datum_size);
datum_value = DatumGetPointer(pergroupstate->transValue);
}
/* Otherwise it's a real ByVal, do nothing */
else
......@@ -1630,34 +1630,36 @@ writeHashEntry(AggState *aggstate, BatchFileInfo *file_info,
continue;
}
Assert(MAXALIGN(datum_size) - datum_size <= MAXIMUM_ALIGNOF);
if (MAXALIGN(datum_size) - datum_size > 0)
if ((aggDataOffset + MAXALIGN(datum_size)) >= aggDataBufferSize)
{
BufFileWriteOrError(file_info->wfile,
padding_dummy, MAXALIGN(datum_size) - datum_size);
aggDataBufferSize += bufferIncrementSize;
MemoryContext oldAggContext = MemoryContextSwitchTo(TopMemoryContext);
aggDataBuffer = repalloc(aggDataBuffer, aggDataBufferSize);
MemoryContextSwitchTo(oldAggContext);
}
aggstateSize += MAXALIGN(datum_size);
}
memcpy((aggDataBuffer + aggDataOffset), datum_value, datum_size);
if (aggstateSize)
{
total_size += aggstateSize;
aggDataOffset += MAXALIGN(datum_size);
/* Rewind to write the correct total_size */
if (BufFileSeek(file_info->wfile, 0, -(aggstateSize + MAXALIGN(tuple_agg_size) + sizeof(total_size)), SEEK_CUR) != 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not seek in hash agg temporary file: %m")));
/* if it had a valid serialization function, then free the value */
if (OidIsValid(peraggstate->serialfn_oid))
pfree(datum_value);
}
BufFileWriteOrError(file_info->wfile, (char *) &total_size, sizeof(total_size));
total_size = MAXALIGN(tuple_agg_size) + aggDataOffset;
// write
BufFileWriteOrError(file_info->wfile, (void *) &entry->hashvalue, sizeof(entry->hashvalue));
BufFileWriteOrError(file_info->wfile, (char *) &total_size, sizeof(total_size));
BufFileWriteOrError(file_info->wfile, entry->tuple_and_aggs, tuple_agg_size);
Assert(MAXALIGN(tuple_agg_size) - tuple_agg_size <= MAXIMUM_ALIGNOF);
/* Go back to the last offset */
if (BufFileSeek(file_info->wfile, 0, aggstateSize + MAXALIGN(tuple_agg_size), SEEK_CUR) != 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not seek in hash agg temporary file: %m")));
if (MAXALIGN(tuple_agg_size) - tuple_agg_size > 0)
{
BufFileWriteOrError(file_info->wfile, padding_dummy, MAXALIGN(tuple_agg_size) - tuple_agg_size);
}
/* Write the transition aggstates */
if (aggDataOffset)
BufFileWriteOrError(file_info->wfile, aggDataBuffer, aggDataOffset);
return (total_size + sizeof(total_size) + sizeof(entry->hashvalue));
}
......
......@@ -91,13 +91,6 @@ create table aggspill (i int, j int, t text) distributed by (i);
insert into aggspill select i, i*2, i::text from generate_series(1, 10000) i;
insert into aggspill select i, i*2, i::text from generate_series(1, 100000) i;
insert into aggspill select i, i*2, i::text from generate_series(1, 1000000) i;
-- Test the spilling with serial/deserial functions involved
-- The transition type of numeric is internal, and hence it uses the serial/deserial functions when spilling
drop table if exists aggspill_numeric_avg;
create table aggspill_numeric_avg (a int, b int, c numeric) distributed by (a);
insert into aggspill_numeric_avg (select i, i + 1, i * 1.1111 from generate_series(1, 500000) as i);
insert into aggspill_numeric_avg (select i, i + 1, i * 1.1111 from generate_series(1, 500000) as i);
analyze aggspill_numeric_avg;
-- No spill with large statement memory
set statement_mem = '125MB';
select count(*) from (select i, count(*) from aggspill group by i,j having count(*) = 1) g;
......@@ -121,12 +114,6 @@ select count(*) from (select i, count(*) from aggspill group by i,j having count
90000
(1 row)
select count(*) from (select a, avg(b), avg(c) from aggspill_numeric_avg group by a) g;
count
--------
500000
(1 row)
-- Reduce the statement memory, nbatches and entrysize even further to cause multiple overflows
set gp_hashagg_default_nbatches = 4;
set statement_mem = '5MB';
......@@ -143,13 +130,34 @@ select count(*) from (select i, count(*) from aggspill group by i,j,t having cou
10000
(1 row)
select count(*) from (select a, avg(b), avg(c) from aggspill_numeric_avg group by a) g;
count
--------
500000
reset optimizer_force_multistage_agg;
-- Test the spilling of aggstates
-- with and without serial/deserial functions
-- with and without workfile compression
-- The transition type of numeric is internal, and hence it uses the serial/deserial functions when spilling
-- The transition type value of integer is by Ref, and it does not have any serial/deserial function when spilling
CREATE TABLE hashagg_spill(col1 numeric, col2 int) DISTRIBUTED BY (col1);
INSERT INTO hashagg_spill SELECT id, 1 FROM generate_series(1,20000) id;
ANALYZE hashagg_spill;
SET statement_mem='1000kB';
SET gp_workfile_compression = OFF;
select overflows >= 1 from hashagg_spill.num_hashagg_overflows('explain analyze
SELECT avg(col2) col2 FROM hashagg_spill GROUP BY col1 HAVING(sum(col1)) < 0;') overflows;
?column?
----------
t
(1 row)
reset optimizer_force_multistage_agg;
SET gp_workfile_compression = ON;
select overflows >= 1 from hashagg_spill.num_hashagg_overflows('explain analyze
SELECT avg(col2) col2 FROM hashagg_spill GROUP BY col1 HAVING(sum(col1)) < 0;') overflows;
?column?
----------
t
(1 row)
RESET statement_mem;
RESET gp_workfile_compression;
drop schema hashagg_spill cascade;
NOTICE: drop cascades to 3 other objects
DETAIL: drop cascades to function hashagg_spill.is_workfile_created(text)
......
......@@ -77,14 +77,6 @@ insert into aggspill select i, i*2, i::text from generate_series(1, 10000) i;
insert into aggspill select i, i*2, i::text from generate_series(1, 100000) i;
insert into aggspill select i, i*2, i::text from generate_series(1, 1000000) i;
-- Test the spilling with serial/deserial functions involved
-- The transition type of numeric is internal, and hence it uses the serial/deserial functions when spilling
drop table if exists aggspill_numeric_avg;
create table aggspill_numeric_avg (a int, b int, c numeric) distributed by (a);
insert into aggspill_numeric_avg (select i, i + 1, i * 1.1111 from generate_series(1, 500000) as i);
insert into aggspill_numeric_avg (select i, i + 1, i * 1.1111 from generate_series(1, 500000) as i);
analyze aggspill_numeric_avg;
-- No spill with large statement memory
set statement_mem = '125MB';
select count(*) from (select i, count(*) from aggspill group by i,j having count(*) = 1) g;
......@@ -94,7 +86,6 @@ set statement_mem = '10MB';
select overflows >= 1 from hashagg_spill.num_hashagg_overflows('explain analyze
select count(*) from (select i, count(*) from aggspill group by i,j having count(*) = 2) g') overflows;
select count(*) from (select i, count(*) from aggspill group by i,j having count(*) = 2) g;
select count(*) from (select a, avg(b), avg(c) from aggspill_numeric_avg group by a) g;
-- Reduce the statement memory, nbatches and entrysize even further to cause multiple overflows
set gp_hashagg_default_nbatches = 4;
......@@ -104,7 +95,26 @@ select overflows > 1 from hashagg_spill.num_hashagg_overflows('explain analyze
select count(*) from (select i, count(*) from aggspill group by i,j,t having count(*) = 3) g') overflows;
select count(*) from (select i, count(*) from aggspill group by i,j,t having count(*) = 3) g;
select count(*) from (select a, avg(b), avg(c) from aggspill_numeric_avg group by a) g;
reset optimizer_force_multistage_agg;
-- Test the spilling of aggstates
-- with and without serial/deserial functions
-- with and without workfile compression
-- The transition type of numeric is internal, and hence it uses the serial/deserial functions when spilling
-- The transition type value of integer is by Ref, and it does not have any serial/deserial function when spilling
CREATE TABLE hashagg_spill(col1 numeric, col2 int) DISTRIBUTED BY (col1);
INSERT INTO hashagg_spill SELECT id, 1 FROM generate_series(1,20000) id;
ANALYZE hashagg_spill;
SET statement_mem='1000kB';
SET gp_workfile_compression = OFF;
select overflows >= 1 from hashagg_spill.num_hashagg_overflows('explain analyze
SELECT avg(col2) col2 FROM hashagg_spill GROUP BY col1 HAVING(sum(col1)) < 0;') overflows;
SET gp_workfile_compression = ON;
select overflows >= 1 from hashagg_spill.num_hashagg_overflows('explain analyze
SELECT avg(col2) col2 FROM hashagg_spill GROUP BY col1 HAVING(sum(col1)) < 0;') overflows;
RESET statement_mem;
RESET gp_workfile_compression;
drop schema hashagg_spill cascade;
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册