提交 f780ca11 编写于 作者: A Adam Lee 提交者: Adam Lee

Create a sub memory context for serialization functions

And reset it after to make sure no memory leaks there.

For instance, the deserialized transValues for new entries (not that
temporary) are not in group_buf, and not freed.
Co-authored-by: NNing Yu <nyu@pivotal.io>
上级 c9258ae3
......@@ -340,7 +340,10 @@ makeHashAggEntryForInput(AggState *aggstate, TupleTableSlot *inputslot, uint32 h
if (GET_TOTAL_USED_SIZE(hashtable) + MAXALIGN(MAXALIGN(tup_len) + aggs_len) >=
hashtable->max_mem)
{
MemoryContextSwitchTo(oldcxt);
return NULL;
}
/*
* Form memtuple into group_buf.
......@@ -391,7 +394,11 @@ makeHashAggEntryForGroup(AggState *aggstate, void *tuple_and_aggs,
copy_tuple_and_aggs = mpool_alloc(hashtable->group_buf, input_size);
memcpy(copy_tuple_and_aggs, tuple_and_aggs, input_size);
oldcxt = MemoryContextSwitchTo(hashtable->entry_cxt);
/*
* The deserialized transValues are not in mpool, put them
* in a separate context and reset with mpool_reset
*/
oldcxt = MemoryContextSwitchTo(hashtable->serialization_cxt);
entry = getEmptyHashAggEntry(aggstate);
entry->hashvalue = hashvalue;
......@@ -809,6 +816,12 @@ create_agg_hash_table(AggState *aggstate)
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
hashtable->serialization_cxt = AllocSetContextCreate(hashtable->entry_cxt,
"HashAggTableEntrySerializationlContext",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
uint64 operatorMemKB = PlanStateOperatorMemKB( (PlanState *) aggstate);
keywidth = est_hash_tuple_size(aggstate->ss.ss_ScanTupleSlot, aggstate->hash_needed);
......@@ -1404,6 +1417,9 @@ spill_hash_table(AggState *aggstate)
/* Reset the buffer */
mpool_reset(hashtable->group_buf);
/* Reset serialization context */
MemoryContextReset(hashtable->serialization_cxt);
/* Reset in-memory entries count */
hashtable->num_entries = 0;
......@@ -1571,6 +1587,7 @@ writeHashEntry(AggState *aggstate, BatchFileInfo *file_info,
if (OidIsValid(peraggstate->serialfn_oid))
{
FunctionCallInfoData fcinfo;
MemoryContext old_ctx;
InitFunctionCallInfoData(fcinfo,
&peraggstate->serialfn,
......@@ -1581,12 +1598,17 @@ writeHashEntry(AggState *aggstate, BatchFileInfo *file_info,
fcinfo.arg[0] = pergroupstate->transValue;
fcinfo.argnull[0] = pergroupstate->transValueIsNull;
/* Not necessary to do this if the serialization func has no memory leak */
old_ctx = MemoryContextSwitchTo(aggstate->hhashtable->serialization_cxt);
serializedVal = FunctionCallInvoke(&fcinfo);
datum_size = datumGetSize(serializedVal, byteaTranstypeByVal, byteaTranstypeLen);
BufFileWriteOrError(file_info->wfile,
DatumGetPointer(serializedVal), datum_size);
pfree(DatumGetPointer(serializedVal));
MemoryContextSwitchTo(old_ctx);
}
/* If it's a ByRef, write the data to the file */
else if (!peraggstate->transtypeByVal)
......@@ -2316,6 +2338,8 @@ void reset_agg_hash_table(AggState *aggstate, int64 nentries)
mpool_reset(hashtable->group_buf);
MemoryContextReset(hashtable->serialization_cxt);
init_agg_hash_iter(hashtable);
Gpmon_ResetAggHashTable(aggstate);
......
......@@ -168,6 +168,9 @@ typedef struct HashAggTable
SpillFile *curr_spill_file;
int curr_spill_level;
/* The memory context for (de)serialization */
MemoryContext serialization_cxt;
/*
* The space to buffer the free hash entries and AggStatePerGroups. Using this,
* we can reduce palloc/pfree calls.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册