diff --git a/src/backend/executor/execHHashagg.c b/src/backend/executor/execHHashagg.c index aa70fefbb9da9f156a0a48d0235b9513471627c5..56984e8b9b33185ac5227711fadf06e7313ff803 100644 --- a/src/backend/executor/execHHashagg.c +++ b/src/backend/executor/execHHashagg.c @@ -340,7 +340,10 @@ makeHashAggEntryForInput(AggState *aggstate, TupleTableSlot *inputslot, uint32 h if (GET_TOTAL_USED_SIZE(hashtable) + MAXALIGN(MAXALIGN(tup_len) + aggs_len) >= hashtable->max_mem) + { + MemoryContextSwitchTo(oldcxt); return NULL; + } /* * Form memtuple into group_buf. @@ -391,7 +394,11 @@ makeHashAggEntryForGroup(AggState *aggstate, void *tuple_and_aggs, copy_tuple_and_aggs = mpool_alloc(hashtable->group_buf, input_size); memcpy(copy_tuple_and_aggs, tuple_and_aggs, input_size); - oldcxt = MemoryContextSwitchTo(hashtable->entry_cxt); + /* + * The deserialized transValues are not in mpool, put them + * in a separate context and reset with mpool_reset + */ + oldcxt = MemoryContextSwitchTo(hashtable->serialization_cxt); entry = getEmptyHashAggEntry(aggstate); entry->hashvalue = hashvalue; @@ -809,6 +816,12 @@ create_agg_hash_table(AggState *aggstate) ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); + hashtable->serialization_cxt = AllocSetContextCreate(hashtable->entry_cxt, + "HashAggTableEntrySerializationlContext", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + uint64 operatorMemKB = PlanStateOperatorMemKB( (PlanState *) aggstate); keywidth = est_hash_tuple_size(aggstate->ss.ss_ScanTupleSlot, aggstate->hash_needed); @@ -1404,6 +1417,9 @@ spill_hash_table(AggState *aggstate) /* Reset the buffer */ mpool_reset(hashtable->group_buf); + /* Reset serialization context */ + MemoryContextReset(hashtable->serialization_cxt); + /* Reset in-memory entries count */ hashtable->num_entries = 0; @@ -1571,6 +1587,7 @@ writeHashEntry(AggState *aggstate, BatchFileInfo *file_info, if (OidIsValid(peraggstate->serialfn_oid)) { FunctionCallInfoData fcinfo; + MemoryContext old_ctx; InitFunctionCallInfoData(fcinfo, &peraggstate->serialfn, @@ -1581,12 +1598,17 @@ writeHashEntry(AggState *aggstate, BatchFileInfo *file_info, fcinfo.arg[0] = pergroupstate->transValue; fcinfo.argnull[0] = pergroupstate->transValueIsNull; + /* Not necessary to do this if the serialization func has no memory leak */ + old_ctx = MemoryContextSwitchTo(aggstate->hhashtable->serialization_cxt); + serializedVal = FunctionCallInvoke(&fcinfo); datum_size = datumGetSize(serializedVal, byteaTranstypeByVal, byteaTranstypeLen); BufFileWriteOrError(file_info->wfile, DatumGetPointer(serializedVal), datum_size); pfree(DatumGetPointer(serializedVal)); + + MemoryContextSwitchTo(old_ctx); } /* If it's a ByRef, write the data to the file */ else if (!peraggstate->transtypeByVal) @@ -2316,6 +2338,8 @@ void reset_agg_hash_table(AggState *aggstate, int64 nentries) mpool_reset(hashtable->group_buf); + MemoryContextReset(hashtable->serialization_cxt); + init_agg_hash_iter(hashtable); Gpmon_ResetAggHashTable(aggstate); diff --git a/src/include/executor/execHHashagg.h b/src/include/executor/execHHashagg.h index 2ce05fcd579359828bc5191c94087586b0d0d678..a9f7cb861b8d69dd07dd9eb2b53d3e199abae7b2 100644 --- a/src/include/executor/execHHashagg.h +++ b/src/include/executor/execHHashagg.h @@ -168,6 +168,9 @@ typedef struct HashAggTable SpillFile *curr_spill_file; int curr_spill_level; + /* The memory context for (de)serialization */ + MemoryContext serialization_cxt; + /* * The space to buffer the free hash entries and AggStatePerGroups. Using this, * we can reduce palloc/pfree calls.