未验证 提交 077fbd67 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #15010 from taosdata/fix/TD-17460

fix: fix taosd mem leak
...@@ -172,13 +172,8 @@ typedef struct tExprNode { ...@@ -172,13 +172,8 @@ typedef struct tExprNode {
void tExprTreeDestroy(tExprNode *pNode, void (*fp)(void *)); void tExprTreeDestroy(tExprNode *pNode, void (*fp)(void *));
typedef enum {
SHOULD_FREE_COLDATA = 0x1, // the newly created column data needs to be destroyed.
DELEGATED_MGMT_COLDATA = 0x2, // input column data should not be released.
} ECOLDATA_MGMT_TYPE_E;
struct SScalarParam { struct SScalarParam {
ECOLDATA_MGMT_TYPE_E type; bool colAlloced;
SColumnInfoData *columnData; SColumnInfoData *columnData;
SHashObj *pHashFilter; SHashObj *pHashFilter;
int32_t hashValueType; int32_t hashValueType;
......
...@@ -3186,6 +3186,7 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6 ...@@ -3186,6 +3186,7 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6
*suid = 0; *suid = 0;
if (mr.me.type == TSDB_CHILD_TABLE) { if (mr.me.type == TSDB_CHILD_TABLE) {
tDecoderClear(&mr.coder);
*suid = mr.me.ctbEntry.suid; *suid = mr.me.ctbEntry.suid;
code = metaGetTableEntryByUid(&mr, *suid); code = metaGetTableEntryByUid(&mr, *suid);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
......
...@@ -90,6 +90,7 @@ _return: ...@@ -90,6 +90,7 @@ _return:
tsem_post(&pInserter->ready); tsem_post(&pInserter->ready);
taosMemoryFree(pMsg->pData);
taosMemoryFree(param); taosMemoryFree(param);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -283,6 +284,8 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) { ...@@ -283,6 +284,8 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pInserter->cachedSize); atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pInserter->cachedSize);
taosArrayDestroy(pInserter->pDataBlocks); taosArrayDestroy(pInserter->pDataBlocks);
taosMemoryFree(pInserter->pSchema); taosMemoryFree(pInserter->pSchema);
taosMemoryFree(pInserter->pParam);
taosHashCleanup(pInserter->pCols);
taosThreadMutexDestroy(&pInserter->mutex); taosThreadMutexDestroy(&pInserter->mutex);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -624,7 +624,8 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc ...@@ -624,7 +624,8 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
int32_t startOffset = createNewColModel ? 0 : pResult->info.rows; int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
ASSERT(pResult->info.capacity > 0); ASSERT(pResult->info.capacity > 0);
colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows); colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows);
colDataDestroy(&idata);
numOfRows = dest.numOfRows; numOfRows = dest.numOfRows;
taosArrayDestroy(pBlockList); taosArrayDestroy(pBlockList);
} else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) { } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) {
...@@ -679,6 +680,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc ...@@ -679,6 +680,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
int32_t startOffset = createNewColModel ? 0 : pResult->info.rows; int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
ASSERT(pResult->info.capacity > 0); ASSERT(pResult->info.capacity > 0);
colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows); colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows);
colDataDestroy(&idata);
numOfRows = dest.numOfRows; numOfRows = dest.numOfRows;
taosArrayDestroy(pBlockList); taosArrayDestroy(pBlockList);
......
...@@ -855,6 +855,7 @@ int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output) { ...@@ -855,6 +855,7 @@ int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output) {
memcpy(output->columnData, memcpy(output->columnData,
taosArrayGet(input->pDataBlock, 0), taosArrayGet(input->pDataBlock, 0),
sizeof(SColumnInfoData)); sizeof(SColumnInfoData));
output->colAlloced = true;
return 0; return 0;
} }
......
...@@ -952,6 +952,7 @@ void nodesDestroyNode(SNode* pNode) { ...@@ -952,6 +952,7 @@ void nodesDestroyNode(SNode* pNode) {
case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: { case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: {
SQueryInserterNode* pSink = (SQueryInserterNode*)pNode; SQueryInserterNode* pSink = (SQueryInserterNode*)pNode;
destroyDataSinkNode((SDataSinkNode*)pSink); destroyDataSinkNode((SDataSinkNode*)pSink);
nodesDestroyList(pSink->pCols);
break; break;
} }
case QUERY_NODE_PHYSICAL_PLAN_DELETE: { case QUERY_NODE_PHYSICAL_PLAN_DELETE: {
......
...@@ -55,7 +55,7 @@ int32_t sclCreateColumnInfoData(SDataType* pType, int32_t numOfRows, SScalarPara ...@@ -55,7 +55,7 @@ int32_t sclCreateColumnInfoData(SDataType* pType, int32_t numOfRows, SScalarPara
} }
pParam->columnData = pColumnData; pParam->columnData = pColumnData;
pParam->type = SHOULD_FREE_COLDATA; pParam->colAlloced = true;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -166,6 +166,10 @@ void sclFreeRes(SHashObj *res) { ...@@ -166,6 +166,10 @@ void sclFreeRes(SHashObj *res) {
} }
void sclFreeParam(SScalarParam *param) { void sclFreeParam(SScalarParam *param) {
if (!param->colAlloced) {
return;
}
if (param->columnData != NULL) { if (param->columnData != NULL) {
colDataDestroy(param->columnData); colDataDestroy(param->columnData);
taosMemoryFreeClear(param->columnData); taosMemoryFreeClear(param->columnData);
...@@ -173,6 +177,7 @@ void sclFreeParam(SScalarParam *param) { ...@@ -173,6 +177,7 @@ void sclFreeParam(SScalarParam *param) {
if (param->pHashFilter != NULL) { if (param->pHashFilter != NULL) {
taosHashCleanup(param->pHashFilter); taosHashCleanup(param->pHashFilter);
param->pHashFilter = NULL;
} }
} }
...@@ -191,6 +196,19 @@ int32_t sclCopyValueNodeValue(SValueNode *pNode, void **res) { ...@@ -191,6 +196,19 @@ int32_t sclCopyValueNodeValue(SValueNode *pNode, void **res) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void sclFreeParamList(SScalarParam *param, int32_t paramNum) {
if (NULL == param) {
return;
}
for (int32_t i = 0; i < paramNum; ++i) {
SScalarParam* p = param + i;
sclFreeParam(p);
}
taosMemoryFree(param);
}
int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t *rowNum) { int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t *rowNum) {
switch (nodeType(node)) { switch (nodeType(node)) {
case QUERY_NODE_LEFT_VALUE: { case QUERY_NODE_LEFT_VALUE: {
...@@ -225,11 +243,14 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t ...@@ -225,11 +243,14 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t
SCL_ERR_RET(scalarGenerateSetFromList((void **)&param->pHashFilter, node, type)); SCL_ERR_RET(scalarGenerateSetFromList((void **)&param->pHashFilter, node, type));
param->hashValueType = type; param->hashValueType = type;
param->colAlloced = true;
if (taosHashPut(ctx->pRes, &node, POINTER_BYTES, param, sizeof(*param))) { if (taosHashPut(ctx->pRes, &node, POINTER_BYTES, param, sizeof(*param))) {
taosHashCleanup(param->pHashFilter); taosHashCleanup(param->pHashFilter);
param->pHashFilter = NULL;
sclError("taosHashPut nodeList failed, size:%d", (int32_t)sizeof(*param)); sclError("taosHashPut nodeList failed, size:%d", (int32_t)sizeof(*param));
return TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY;
} }
param->colAlloced = false;
break; break;
} }
case QUERY_NODE_COLUMN: { case QUERY_NODE_COLUMN: {
...@@ -274,6 +295,7 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t ...@@ -274,6 +295,7 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t
SCL_ERR_RET(TSDB_CODE_QRY_APP_ERROR); SCL_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
} }
*param = *res; *param = *res;
param->colAlloced = false;
break; break;
} }
default: default:
...@@ -455,11 +477,7 @@ int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *outp ...@@ -455,11 +477,7 @@ int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *outp
_return: _return:
for (int32_t i = 0; i < paramNum; ++i) { sclFreeParamList(params, paramNum);
// sclFreeParamNoData(params + i);
}
taosMemoryFreeClear(params);
SCL_RET(code); SCL_RET(code);
} }
...@@ -533,11 +551,7 @@ int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *o ...@@ -533,11 +551,7 @@ int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *o
_return: _return:
for (int32_t i = 0; i < paramNum; ++i) { sclFreeParamList(params, paramNum);
// sclFreeParamNoData(params + i);
}
taosMemoryFreeClear(params);
SCL_RET(code); SCL_RET(code);
} }
...@@ -573,14 +587,8 @@ int32_t sclExecOperator(SOperatorNode *node, SScalarCtx *ctx, SScalarParam *outp ...@@ -573,14 +587,8 @@ int32_t sclExecOperator(SOperatorNode *node, SScalarCtx *ctx, SScalarParam *outp
code = terrno; code = terrno;
_return: _return:
for (int32_t i = 0; i < paramNum; ++i) {
if (params[i].type == SHOULD_FREE_COLDATA) {
colDataDestroy(params[i].columnData);
taosMemoryFreeClear(params[i].columnData);
}
}
taosMemoryFreeClear(params); sclFreeParamList(params, paramNum);
SCL_RET(code); SCL_RET(code);
} }
...@@ -871,7 +879,6 @@ EDealRes sclWalkFunction(SNode* pNode, SScalarCtx *ctx) { ...@@ -871,7 +879,6 @@ EDealRes sclWalkFunction(SNode* pNode, SScalarCtx *ctx) {
return DEAL_RES_ERROR; return DEAL_RES_ERROR;
} }
output.type = DELEGATED_MGMT_COLDATA;
if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) { if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY; ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
return DEAL_RES_ERROR; return DEAL_RES_ERROR;
...@@ -906,7 +913,6 @@ EDealRes sclWalkOperator(SNode* pNode, SScalarCtx *ctx) { ...@@ -906,7 +913,6 @@ EDealRes sclWalkOperator(SNode* pNode, SScalarCtx *ctx) {
return DEAL_RES_ERROR; return DEAL_RES_ERROR;
} }
output.type = DELEGATED_MGMT_COLDATA;
if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) { if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY; ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
return DEAL_RES_ERROR; return DEAL_RES_ERROR;
......
...@@ -207,7 +207,7 @@ void flttMakeListNode(SNode **pNode, SNodeList *list, int32_t resType) { ...@@ -207,7 +207,7 @@ void flttMakeListNode(SNode **pNode, SNodeList *list, int32_t resType) {
void initScalarParam(SScalarParam* pParam) { void initScalarParam(SScalarParam* pParam) {
memset(pParam, 0, sizeof(SScalarParam)); memset(pParam, 0, sizeof(SScalarParam));
pParam->type = SHOULD_FREE_COLDATA; pParam->colAlloced = true;
} }
} }
......
...@@ -21,12 +21,11 @@ sql create table tb4 using st1 tags(4); ...@@ -21,12 +21,11 @@ sql create table tb4 using st1 tags(4);
sql insert into tb4 select * from tb1; sql insert into tb4 select * from tb1;
goto _OVER
sql select * from tb4; sql select * from tb4;
if $rows != 2 then if $rows != 2 then
return -1 return -1
endi endi
sql insert into tb4 select ts,f1,f2 from st1; sql insert into tb4 select ts,f1,f2 from st1;
sql select * from tb4; sql select * from tb4;
if $rows != 6 then if $rows != 6 then
...@@ -59,4 +58,4 @@ endi ...@@ -59,4 +58,4 @@ endi
if $system_content == $null then if $system_content == $null then
return -1 return -1
endi endi
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册