From f780ca114612cd2ebf630a5616d71eddede7362a Mon Sep 17 00:00:00 2001 From: Adam Lee Date: Wed, 10 Jul 2019 16:40:01 +0800 Subject: [PATCH] Create a sub memory context for serialization functions And reset it after to make sure no memory leaks there. For instance, the deserialized transValues for new entries (not that temporary) are not in group_buf, and not freed. Co-authored-by: Ning Yu --- src/backend/executor/execHHashagg.c | 26 +++++++++++++++++++++++++- src/include/executor/execHHashagg.h | 3 +++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/src/backend/executor/execHHashagg.c b/src/backend/executor/execHHashagg.c index aa70fefbb9..56984e8b9b 100644 --- a/src/backend/executor/execHHashagg.c +++ b/src/backend/executor/execHHashagg.c @@ -340,7 +340,10 @@ makeHashAggEntryForInput(AggState *aggstate, TupleTableSlot *inputslot, uint32 h if (GET_TOTAL_USED_SIZE(hashtable) + MAXALIGN(MAXALIGN(tup_len) + aggs_len) >= hashtable->max_mem) + { + MemoryContextSwitchTo(oldcxt); return NULL; + } /* * Form memtuple into group_buf. @@ -391,7 +394,11 @@ makeHashAggEntryForGroup(AggState *aggstate, void *tuple_and_aggs, copy_tuple_and_aggs = mpool_alloc(hashtable->group_buf, input_size); memcpy(copy_tuple_and_aggs, tuple_and_aggs, input_size); - oldcxt = MemoryContextSwitchTo(hashtable->entry_cxt); + /* + * The deserialized transValues are not in mpool, put them + * in a separate context and reset with mpool_reset + */ + oldcxt = MemoryContextSwitchTo(hashtable->serialization_cxt); entry = getEmptyHashAggEntry(aggstate); entry->hashvalue = hashvalue; @@ -809,6 +816,12 @@ create_agg_hash_table(AggState *aggstate) ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); + hashtable->serialization_cxt = AllocSetContextCreate(hashtable->entry_cxt, + "HashAggTableEntrySerializationlContext", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + uint64 operatorMemKB = PlanStateOperatorMemKB( (PlanState *) aggstate); keywidth = est_hash_tuple_size(aggstate->ss.ss_ScanTupleSlot, aggstate->hash_needed); @@ -1404,6 +1417,9 @@ spill_hash_table(AggState *aggstate) /* Reset the buffer */ mpool_reset(hashtable->group_buf); + /* Reset serialization context */ + MemoryContextReset(hashtable->serialization_cxt); + /* Reset in-memory entries count */ hashtable->num_entries = 0; @@ -1571,6 +1587,7 @@ writeHashEntry(AggState *aggstate, BatchFileInfo *file_info, if (OidIsValid(peraggstate->serialfn_oid)) { FunctionCallInfoData fcinfo; + MemoryContext old_ctx; InitFunctionCallInfoData(fcinfo, &peraggstate->serialfn, @@ -1581,12 +1598,17 @@ writeHashEntry(AggState *aggstate, BatchFileInfo *file_info, fcinfo.arg[0] = pergroupstate->transValue; fcinfo.argnull[0] = pergroupstate->transValueIsNull; + /* Not necessary to do this if the serialization func has no memory leak */ + old_ctx = MemoryContextSwitchTo(aggstate->hhashtable->serialization_cxt); + serializedVal = FunctionCallInvoke(&fcinfo); datum_size = datumGetSize(serializedVal, byteaTranstypeByVal, byteaTranstypeLen); BufFileWriteOrError(file_info->wfile, DatumGetPointer(serializedVal), datum_size); pfree(DatumGetPointer(serializedVal)); + + MemoryContextSwitchTo(old_ctx); } /* If it's a ByRef, write the data to the file */ else if (!peraggstate->transtypeByVal) @@ -2316,6 +2338,8 @@ void reset_agg_hash_table(AggState *aggstate, int64 nentries) mpool_reset(hashtable->group_buf); + MemoryContextReset(hashtable->serialization_cxt); + init_agg_hash_iter(hashtable); Gpmon_ResetAggHashTable(aggstate); diff --git a/src/include/executor/execHHashagg.h b/src/include/executor/execHHashagg.h index 2ce05fcd57..a9f7cb861b 100644 --- a/src/include/executor/execHHashagg.h +++ b/src/include/executor/execHHashagg.h @@ -168,6 +168,9 @@ typedef struct HashAggTable SpillFile *curr_spill_file; int curr_spill_level; + /* The memory context for (de)serialization */ + MemoryContext serialization_cxt; + /* * The space to buffer the free hash entries and AggStatePerGroups. Using this, * we can reduce palloc/pfree calls. -- GitLab