diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 4d27325d7575a2c2704671ec9193e3a7ae7dbf3b..8cb48cc9f05df87c1636ed60c032ddadeac75d8e 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -172,13 +172,8 @@ typedef struct tExprNode { void tExprTreeDestroy(tExprNode *pNode, void (*fp)(void *)); -typedef enum { - SHOULD_FREE_COLDATA = 0x1, // the newly created column data needs to be destroyed. - DELEGATED_MGMT_COLDATA = 0x2, // input column data should not be released. -} ECOLDATA_MGMT_TYPE_E; - struct SScalarParam { - ECOLDATA_MGMT_TYPE_E type; + bool colAlloced; SColumnInfoData *columnData; SHashObj *pHashFilter; int32_t hashValueType; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 7b4127e5915f502c53910fe2795b506b30ef5b44..0b05bb72247efb5e3a108ba921b24dda35c9c82e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3186,6 +3186,7 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6 *suid = 0; if (mr.me.type == TSDB_CHILD_TABLE) { + tDecoderClear(&mr.coder); *suid = mr.me.ctbEntry.suid; code = metaGetTableEntryByUid(&mr, *suid); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/executor/src/dataInserter.c b/source/libs/executor/src/dataInserter.c index e53c9fae6f8a0f8feffde8397a42aa2cb3e334fe..a575e355f1dc7a7aa866bdeb7f548005af10d7c7 100644 --- a/source/libs/executor/src/dataInserter.c +++ b/source/libs/executor/src/dataInserter.c @@ -90,6 +90,7 @@ _return: tsem_post(&pInserter->ready); + taosMemoryFree(pMsg->pData); taosMemoryFree(param); return TSDB_CODE_SUCCESS; @@ -283,6 +284,8 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) { atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pInserter->cachedSize); taosArrayDestroy(pInserter->pDataBlocks); taosMemoryFree(pInserter->pSchema); + taosMemoryFree(pInserter->pParam); + taosHashCleanup(pInserter->pCols); taosThreadMutexDestroy(&pInserter->mutex); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index e438a9a3ed9579c13fd5a30eadb882fa9d41f061..c3d9f66479f3a277d8a855271b63303f7867d26b 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -624,7 +624,8 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc int32_t startOffset = createNewColModel ? 0 : pResult->info.rows; ASSERT(pResult->info.capacity > 0); colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows); - + colDataDestroy(&idata); + numOfRows = dest.numOfRows; taosArrayDestroy(pBlockList); } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) { @@ -679,6 +680,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc int32_t startOffset = createNewColModel ? 0 : pResult->info.rows; ASSERT(pResult->info.capacity > 0); colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows); + colDataDestroy(&idata); numOfRows = dest.numOfRows; taosArrayDestroy(pBlockList); diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index c16c3e39371dd23683fc15402bbb9f9887209c25..6ad552576c3a7cf5b584d5cc6143a69ceace32d0 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -855,6 +855,7 @@ int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output) { memcpy(output->columnData, taosArrayGet(input->pDataBlock, 0), sizeof(SColumnInfoData)); + output->colAlloced = true; return 0; } diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 2636b7ebe0afe28afa441260ac2627c8ce76332d..1dc3db033bcfb0905b1552bfc35a7310f976d494 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -952,6 +952,7 @@ void nodesDestroyNode(SNode* pNode) { case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: { SQueryInserterNode* pSink = (SQueryInserterNode*)pNode; destroyDataSinkNode((SDataSinkNode*)pSink); + nodesDestroyList(pSink->pCols); break; } case QUERY_NODE_PHYSICAL_PLAN_DELETE: { diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index 484d95cb5a5532369d9380de1324bce15a5e0fb3..14406a26ed167ac8014a41a97dc0ca04f82f75ce 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -55,7 +55,7 @@ int32_t sclCreateColumnInfoData(SDataType* pType, int32_t numOfRows, SScalarPara } pParam->columnData = pColumnData; - pParam->type = SHOULD_FREE_COLDATA; + pParam->colAlloced = true; return TSDB_CODE_SUCCESS; } @@ -166,6 +166,10 @@ void sclFreeRes(SHashObj *res) { } void sclFreeParam(SScalarParam *param) { + if (!param->colAlloced) { + return; + } + if (param->columnData != NULL) { colDataDestroy(param->columnData); taosMemoryFreeClear(param->columnData); @@ -173,6 +177,7 @@ void sclFreeParam(SScalarParam *param) { if (param->pHashFilter != NULL) { taosHashCleanup(param->pHashFilter); + param->pHashFilter = NULL; } } @@ -191,6 +196,19 @@ int32_t sclCopyValueNodeValue(SValueNode *pNode, void **res) { return TSDB_CODE_SUCCESS; } +void sclFreeParamList(SScalarParam *param, int32_t paramNum) { + if (NULL == param) { + return; + } + + for (int32_t i = 0; i < paramNum; ++i) { + SScalarParam* p = param + i; + sclFreeParam(p); + } + + taosMemoryFree(param); +} + int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t *rowNum) { switch (nodeType(node)) { case QUERY_NODE_LEFT_VALUE: { @@ -225,11 +243,14 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t SCL_ERR_RET(scalarGenerateSetFromList((void **)¶m->pHashFilter, node, type)); param->hashValueType = type; + param->colAlloced = true; if (taosHashPut(ctx->pRes, &node, POINTER_BYTES, param, sizeof(*param))) { taosHashCleanup(param->pHashFilter); + param->pHashFilter = NULL; sclError("taosHashPut nodeList failed, size:%d", (int32_t)sizeof(*param)); return TSDB_CODE_QRY_OUT_OF_MEMORY; } + param->colAlloced = false; break; } case QUERY_NODE_COLUMN: { @@ -274,6 +295,7 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t SCL_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } *param = *res; + param->colAlloced = false; break; } default: @@ -455,11 +477,7 @@ int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *outp _return: - for (int32_t i = 0; i < paramNum; ++i) { -// sclFreeParamNoData(params + i); - } - - taosMemoryFreeClear(params); + sclFreeParamList(params, paramNum); SCL_RET(code); } @@ -533,11 +551,7 @@ int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *o _return: - for (int32_t i = 0; i < paramNum; ++i) { -// sclFreeParamNoData(params + i); - } - - taosMemoryFreeClear(params); + sclFreeParamList(params, paramNum); SCL_RET(code); } @@ -573,14 +587,8 @@ int32_t sclExecOperator(SOperatorNode *node, SScalarCtx *ctx, SScalarParam *outp code = terrno; _return: - for (int32_t i = 0; i < paramNum; ++i) { - if (params[i].type == SHOULD_FREE_COLDATA) { - colDataDestroy(params[i].columnData); - taosMemoryFreeClear(params[i].columnData); - } - } - taosMemoryFreeClear(params); + sclFreeParamList(params, paramNum); SCL_RET(code); } @@ -871,7 +879,6 @@ EDealRes sclWalkFunction(SNode* pNode, SScalarCtx *ctx) { return DEAL_RES_ERROR; } - output.type = DELEGATED_MGMT_COLDATA; if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) { ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY; return DEAL_RES_ERROR; @@ -906,7 +913,6 @@ EDealRes sclWalkOperator(SNode* pNode, SScalarCtx *ctx) { return DEAL_RES_ERROR; } - output.type = DELEGATED_MGMT_COLDATA; if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) { ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY; return DEAL_RES_ERROR; diff --git a/source/libs/scalar/test/filter/filterTests.cpp b/source/libs/scalar/test/filter/filterTests.cpp index d8c64eef55cf0452946184439264ca66ef925ec9..4c4d03fb37464bf8d9fdd681782011d9a5e10df6 100644 --- a/source/libs/scalar/test/filter/filterTests.cpp +++ b/source/libs/scalar/test/filter/filterTests.cpp @@ -207,7 +207,7 @@ void flttMakeListNode(SNode **pNode, SNodeList *list, int32_t resType) { void initScalarParam(SScalarParam* pParam) { memset(pParam, 0, sizeof(SScalarParam)); - pParam->type = SHOULD_FREE_COLDATA; + pParam->colAlloced = true; } } diff --git a/tests/script/tsim/valgrind/basic1.sim b/tests/script/tsim/valgrind/basic1.sim index 3e39f35fa740ebce4b3359b8d27d33451c790ca2..3c47cae5cc907c569295243cedb98b7bfd8f18e4 100644 --- a/tests/script/tsim/valgrind/basic1.sim +++ b/tests/script/tsim/valgrind/basic1.sim @@ -21,12 +21,11 @@ sql create table tb4 using st1 tags(4); sql insert into tb4 select * from tb1; -goto _OVER - sql select * from tb4; if $rows != 2 then return -1 endi + sql insert into tb4 select ts,f1,f2 from st1; sql select * from tb4; if $rows != 6 then @@ -59,4 +58,4 @@ endi if $system_content == $null then return -1 -endi \ No newline at end of file +endi