提交 38f354aa 编写于 作者: H Heikki Linnakangas

Replace custom expandable buffer implementation with StringInfo.

Simpler that way.
上级 232ecfc3
......@@ -47,9 +47,6 @@
#include "utils/syscache.h"
#include "utils/tuplestorenew.h"
/* The initial size (in bytes) for an entry in the frame buffer. */
#define FRAMEBUFFER_ENTRY_SIZE 1024
/* Check for aggregate functions that have only transition functions,
* but not inverse preliminary functions or preliminary functions.
*/
......@@ -237,11 +234,8 @@ typedef struct WindowStatePerLevelData
*
* Use this pre-allocated buffer to avoid doing
* palloc/pfree many times.
*
* The size of this array is specified by 'max_size'.
*/
char *serial_array;
Size max_size;
StringInfoData serial_buf;
} WindowStatePerLevelData;
/*
......@@ -803,62 +797,20 @@ freeFrameBuffers(WindowState * wstate)
}
/*
* ensureSpace -- ensure that there is enough space in the buffer.
*
* This function returns the new written position if the array is
* entended.
* serializeValue -- serialize a given value (Datum) into a StringInfo
*/
static inline char *
ensureSpace(char **p_serial_entry, Size * p_max_size,
char *written_pos, Size written_len)
{
Size current_len = written_pos - *p_serial_entry;
while (current_len + written_len > *p_max_size)
{
*p_max_size += FRAMEBUFFER_ENTRY_SIZE;
*p_serial_entry = repalloc(*p_serial_entry, *p_max_size);
}
return (*p_serial_entry + current_len);
}
/*
* serializeValue -- serialize a given value (Datum) into a char array.
*
* The start writing position in the given char array is specified by
* 'start_pos'. If this value is too big to fit into the available
* space in the array, the size of the array is increased in this
* function.
*
* The return pointer points to the next char position in the
* given char array after writing the serialized string of the
* given value.
*/
static char *
static void
serializeValue(Datum value, bool isnull,
bool byvalue, int16 typelen,
char **p_serial_entry, Size * p_max_size, Size * p_len,
Size start_pos)
StringInfo serial_buf)
{
char *written_pos = (*p_serial_entry) + start_pos;
written_pos = ensureSpace(p_serial_entry, p_max_size,
written_pos, sizeof(bool));
memcpy(written_pos, &isnull, sizeof(bool));
written_pos += sizeof(bool); /* isnull col */
/* isnull col */
appendStringInfoChar(serial_buf, isnull);
if (!isnull)
{
if (byvalue)
{
written_pos = ensureSpace(p_serial_entry, p_max_size,
written_pos, sizeof(Datum));
memcpy(written_pos, &value, sizeof(Datum));
written_pos += sizeof(Datum);
}
appendBinaryStringInfo(serial_buf, &value, sizeof(Datum));
else
{
Size real_size = datumGetSize(value, byvalue, typelen);
......@@ -868,22 +820,15 @@ serializeValue(Datum value, bool isnull,
* will not make a copy for this Datum, but simply convert this
* pointer address to a Datum.
*/
Size alignmentBytes = ((char *)MAXALIGN(written_pos)) - written_pos;
written_pos = ensureSpace(p_serial_entry, p_max_size,
written_pos, real_size + alignmentBytes);
written_pos += alignmentBytes;
Size alignmentBytes = MAXALIGN(serial_buf->len) - serial_buf->len;
Assert(DatumGetPointer(value) != NULL);
memcpy(written_pos,
DatumGetPointer(value),
real_size);
appendStringInfoFill(serial_buf, alignmentBytes, '\0');
written_pos += real_size;
appendBinaryStringInfo(serial_buf,
DatumGetPointer(value),
real_size);
}
}
return written_pos;
}
/*
......@@ -893,11 +838,9 @@ serializeValue(Datum value, bool isnull,
static void
serializeFuncs(WindowStatePerLevel level_state,
ExprContext *econtext,
char **p_serial_entry, Size * p_max_size, Size * p_len,
Size start_pos)
StringInfo serial_buf)
{
ListCell *lc;
char *written_pos = (*p_serial_entry) + start_pos;
int serial_index = -1;
/* the value for each function. */
......@@ -919,12 +862,10 @@ serializeFuncs(WindowStatePerLevel level_state,
get_typlenbyval(CUME_DIST_PRELIM_TYPE,
&typelen, &byvalue);
written_pos =
serializeValue(funcstate->win_value,
funcstate->win_value_is_null,
byvalue, typelen,
p_serial_entry, p_max_size,
p_len, (written_pos - *p_serial_entry));
serializeValue(funcstate->win_value,
funcstate->win_value_is_null,
byvalue, typelen,
serial_buf);
}
else if (IS_LEAD_LAG(funcstate->wrxstate->winkind) ||
IS_FIRST_LAST(funcstate->wrxstate->winkind))
......@@ -937,13 +878,11 @@ serializeFuncs(WindowStatePerLevel level_state,
nargs = list_length(wrxstate->args);
Assert(2 <= nargs && nargs <= 4);
written_pos =
serializeValue(funcstate->aggTransValue,
funcstate->aggTransValueIsNull,
wrxstate->argtypbyval[argno],
wrxstate->argtyplen[argno],
p_serial_entry, p_max_size,
p_len, (written_pos - *p_serial_entry));
serializeValue(funcstate->aggTransValue,
funcstate->aggTransValueIsNull,
wrxstate->argtypbyval[argno],
wrxstate->argtyplen[argno],
serial_buf);
}
/*
......@@ -958,20 +897,13 @@ serializeFuncs(WindowStatePerLevel level_state,
* preliminary function.
*/
if (OidIsValid(funcstate->invprelimfn_oid))
{
written_pos = ensureSpace(p_serial_entry, p_max_size,
written_pos, sizeof(uint64));
memcpy(written_pos, &funcstate->numNotNulls, sizeof(uint64));
written_pos += sizeof(uint64);
}
appendBinaryStringInfo(serial_buf, &funcstate->numNotNulls, sizeof(uint64));
written_pos =
serializeValue(funcstate->aggTransValue,
funcstate->aggTransValueIsNull,
funcstate->aggTranstypeByVal,
funcstate->aggTranstypeLen,
p_serial_entry, p_max_size,
p_len, (written_pos - *p_serial_entry));
serializeValue(funcstate->aggTransValue,
funcstate->aggTransValueIsNull,
funcstate->aggTranstypeByVal,
funcstate->aggTranstypeLen,
serial_buf);
}
else
{
......@@ -997,20 +929,15 @@ serializeFuncs(WindowStatePerLevel level_state,
value = ExecEvalExpr(argstate, econtext, &isnull, NULL);
MemoryContextSwitchTo(oldctx);
written_pos =
serializeValue(value, isnull,
byval, arglen,
p_serial_entry, p_max_size,
p_len, (written_pos - *p_serial_entry));
serializeValue(value, isnull,
byval, arglen,
serial_buf);
}
}
serial_index++;
funcstate->serial_index = serial_index;
}
*p_len = written_pos - *p_serial_entry;
Assert(*p_len <= *p_max_size);
}
/*
......@@ -1176,24 +1103,18 @@ deserializeFuncs(WindowStatePerLevel level_state,
static void
serializeEntry(WindowStatePerLevel level_state,
ExprContext *econtext,
char **p_serial_entry,
Size * p_max_size,
Size * p_len)
StringInfo serial_buf)
{
int key_no;
char *written_pos;
TupleTableSlot *slot = econtext->ecxt_outertuple;
*p_len = 0;
Assert(*p_serial_entry != NULL);
resetStringInfo(serial_buf);
/* We rely on the address of the char array is maxaligned. */
Assert(*p_serial_entry == (char *)MAXALIGN(*p_serial_entry));
written_pos = *p_serial_entry;
Assert(serial_buf->len == MAXALIGN(serial_buf->len));
if (!level_state->is_rows)
{
int key_no;
TupleTableSlot *slot = econtext->ecxt_outertuple;
/* Copy the keys */
for (key_no = 0; key_no < level_state->numSortCols; key_no++)
{
......@@ -1203,49 +1124,17 @@ serializeEntry(WindowStatePerLevel level_state,
key = slot_getattr(slot, attnum, &isnull);
written_pos = ensureSpace(p_serial_entry, p_max_size, written_pos,
sizeof(bool));
memcpy(written_pos, &(isnull), sizeof(bool));
written_pos += sizeof(bool);
if (!isnull)
{
Size keylen = 0;
Size alignmentBytes = 0;
if (level_state->col_typbyvals[key_no])
{
keylen = sizeof(Datum);
}
else
{
alignmentBytes = ((char *)MAXALIGN(written_pos)) - written_pos;
keylen = datumGetSize(key, level_state->col_typbyvals[key_no],
level_state->col_typlens[key_no]);
}
written_pos = ensureSpace(p_serial_entry, p_max_size, written_pos,
keylen + alignmentBytes);
written_pos += alignmentBytes;
if (level_state->col_typbyvals[key_no])
memcpy(written_pos, &(key), keylen);
else
memcpy(written_pos, DatumGetPointer(key), keylen);
written_pos += keylen;
}
serializeValue(key, isnull,
level_state->col_typbyvals[key_no],
level_state->col_typlens[key_no],
serial_buf);
}
}
/* Copy function values */
serializeFuncs(level_state,
econtext,
p_serial_entry,
p_max_size,
p_len,
(written_pos - *p_serial_entry));
serial_buf);
}
/*
......@@ -1454,16 +1343,17 @@ appendToFrameBuffer(WindowStatePerLevel level_state,
WindowState * wstate,
bool last_peer)
{
Size len;
WindowFrameBuffer buffer = level_state->frame_buffer;
ExprContext *econtext = wstate->ps.ps_ExprContext;
Assert(buffer->is_rows == level_state->is_rows);
MemSet(level_state->serial_array, 0, level_state->max_size);
resetStringInfo(&level_state->serial_buf);
serializeEntry(level_state, econtext,
&(level_state->serial_array), &(level_state->max_size), &len);
&(level_state->serial_buf));
ntuplestore_acc_put_data(buffer->writer, (void *)(level_state->serial_array), len);
ntuplestore_acc_put_data(buffer->writer,
(void *)(level_state->serial_buf.data),
level_state->serial_buf.len);
adjustEdgesAfterAppend(level_state, wstate, last_peer);
......@@ -3002,8 +2892,7 @@ makeWindowState(Window * window, EState *estate)
for (int level = 0; level < wstate->numlevels; level++)
{
WindowStatePerLevel level_state = &wstate->level_state[level];
level_state->serial_array = palloc0(FRAMEBUFFER_ENTRY_SIZE);
level_state->max_size = FRAMEBUFFER_ENTRY_SIZE;
initStringInfo(&level_state->serial_buf);
}
}
else
......@@ -5994,7 +5883,6 @@ windowBufferNextLastAgg(WindowBufferCursor cursor)
bool found;
ExprContext *econtext = wstate->ps.ps_ExprContext;
TupleTableSlot *slot = econtext->ecxt_outertuple;
Size len;
found = ntuplestore_acc_current_tupleslot(wstate->input_buffer->writer,
wstate->spare);
......@@ -6008,10 +5896,12 @@ windowBufferNextLastAgg(WindowBufferCursor cursor)
* memory.
*/
econtext->ecxt_outertuple = wstate->spare;
MemSet(level_state->serial_array, 0, level_state->max_size);
resetStringInfo(&level_state->serial_buf);
serializeEntry(level_state, econtext,
&(level_state->serial_array), &(level_state->max_size), &len);
entry = deserializeEntry(level_state, entry, level_state->serial_array, len);
&level_state->serial_buf);
entry = deserializeEntry(level_state, entry,
level_state->serial_buf.data,
level_state->serial_buf.len);
/* Get back the slot. */
econtext->ecxt_outertuple = slot;
/* We never use this again as it's the logical last row. */
......@@ -6190,10 +6080,10 @@ ExecEndWindow(WindowState * node)
freeFrameBufferEntry(level_state->lead_entry_buf);
level_state->lead_entry_buf = NULL;
if (level_state->serial_array != NULL)
if (level_state->serial_buf.data != NULL)
{
pfree(level_state->serial_array);
level_state->serial_array = NULL;
pfree(level_state->serial_buf.data);
level_state->serial_buf.data = NULL;
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册