未验证 提交 94799e05 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #7979 from taosdata/hotfix/TD-6640

[td-6640]fix stddev memory leak issue
......@@ -52,7 +52,7 @@ int tsInsertInitialCheck(SSqlObj *pSql);
void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs);
void tscFreeRetrieveSup(SSqlObj *pSql);
void tscFreeRetrieveSup(void **param);
......
......@@ -48,6 +48,8 @@ struct SSqlInfo;
typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int32_t numOfRows);
typedef void (*_freeSqlSupporter)(void **);
typedef struct SNewVgroupInfo {
int32_t vgId;
int8_t inUse;
......@@ -364,6 +366,7 @@ typedef struct SSqlObj {
__async_cb_func_t fp;
__async_cb_func_t fetchFp;
void *param;
_freeSqlSupporter freeParam;
int64_t stime;
uint32_t queryId;
void * pStream;
......
......@@ -2129,7 +2129,7 @@ void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs) {
SSqlObj* pSub = pSql->pSubs[i];
assert(pSub != NULL);
tscFreeRetrieveSup(pSub);
tscFreeRetrieveSup(&pSub->param);
taos_free_result(pSub);
}
......@@ -2215,10 +2215,13 @@ void doAppendData(SInterResult* pInterResult, TAOS_ROW row, int32_t numOfCols, S
}
}
static void destroySup(SFirstRoundQuerySup* pSup) {
taosArrayDestroyEx(pSup->pResult, freeInterResult);
taosArrayDestroy(pSup->pColsInfo);
tfree(pSup);
static void tscFreeFirstRoundSup(void **param) {
if (*param) {
SFirstRoundQuerySup* pSup = (SFirstRoundQuerySup*)*param;
taosArrayDestroyEx(pSup->pResult, freeInterResult);
taosArrayDestroy(pSup->pColsInfo);
tfree(*param);
}
}
void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
......@@ -2232,8 +2235,10 @@ void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
int32_t code = taos_errno(pSql);
if (code != TSDB_CODE_SUCCESS) {
destroySup(pSup);
tscFreeFirstRoundSup(&param);
taos_free_result(pSql);
pParent->subState.numOfSub = 0;
tfree(pParent->pSubs);
pParent->res.code = code;
tscAsyncResultOnError(pParent);
return;
......@@ -2325,11 +2330,11 @@ void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
tbufCloseWriter(&bw);
}
taosArrayDestroyEx(pSup->pResult, freeInterResult);
taosArrayDestroy(pSup->pColsInfo);
tfree(pSup);
tscFreeFirstRoundSup(&param);
taos_free_result(pSql);
pParent->subState.numOfSub = 0;
tfree(pParent->pSubs);
if (resRows == 0) {
pParent->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
......@@ -2350,8 +2355,10 @@ void tscFirstRoundCallback(void* param, TAOS_RES* tres, int code) {
if (c != TSDB_CODE_SUCCESS) {
SSqlObj* parent = pSup->pParent;
destroySup(pSup);
tscFreeFirstRoundSup(&param);
taos_free_result(pSql);
parent->subState.numOfSub = 0;
tfree(parent->pSubs);
parent->res.code = c;
tscAsyncResultOnError(parent);
return;
......@@ -2374,6 +2381,10 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
SSqlObj *pNew = createSubqueryObj(pSql, 0, tscFirstRoundCallback, pSup, TSDB_SQL_SELECT, NULL);
SSqlCmd *pCmd = &pNew->cmd;
pNew->freeParam = tscFreeFirstRoundSup;
tscDebug("%"PRIx64 " add first round supporter:%p", pNew->self, pSup);
SQueryInfo* pNewQueryInfo = tscGetQueryInfo(pCmd);
assert(pQueryInfo->numOfTables == 1);
......@@ -2502,11 +2513,21 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
pSql->self, pNew->self, 0, pTableMetaInfo->vgroupIndex, pTableMetaInfo->vgroupList->numOfVgroups, pNewQueryInfo->type,
tscNumOfExprs(pNewQueryInfo), index+1, pNewQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pTableMetaInfo->name));
pSql->pSubs = calloc(1, POINTER_BYTES);
if (pSql->pSubs == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error;
}
pSql->subState.numOfSub = 1;
pSql->pSubs[0] = pNew;
tscHandleMasterSTableQuery(pNew);
return TSDB_CODE_SUCCESS;
_error:
destroySup(pSup);
tscFreeFirstRoundSup((void**)&pSup);
taos_free_result(pNew);
pSql->res.code = terrno;
tscAsyncResultOnError(pSql);
......@@ -2697,16 +2718,16 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
return TSDB_CODE_SUCCESS;
}
void tscFreeRetrieveSup(SSqlObj *pSql) {
SRetrieveSupport *trsupport = pSql->param;
void tscFreeRetrieveSup(void **param) {
SRetrieveSupport *trsupport = *param;
void* p = atomic_val_compare_exchange_ptr(&pSql->param, trsupport, 0);
void* p = atomic_val_compare_exchange_ptr(param, trsupport, 0);
if (p == NULL) {
tscDebug("0x%"PRIx64" retrieve supp already released", pSql->self);
tscDebug("retrieve supp already released");
return;
}
tscDebug("0x%"PRIx64" start to free subquery supp obj:%p", pSql->self, trsupport);
tscDebug("start to free subquery restrieve supp obj:%p", trsupport);
tfree(trsupport->localBuffer);
tfree(trsupport);
}
......@@ -2779,12 +2800,12 @@ static int32_t tscReissueSubquery(SRetrieveSupport *oriTrs, SSqlObj *pSql, int32
// if failed to process sql, let following code handle the pSql
if (ret == TSDB_CODE_SUCCESS) {
tscFreeRetrieveSup(pSql);
tscFreeRetrieveSup(&pSql->param);
taos_free_result(pSql);
return ret;
} else {
pParentSql->pSubs[trsupport->subqueryIndex] = pSql;
tscFreeRetrieveSup(pNew);
tscFreeRetrieveSup(&pNew->param);
taos_free_result(pNew);
return ret;
}
......@@ -2839,7 +2860,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
tscDebug("0x%"PRIx64" sub:0x%"PRIx64",%d freed, not finished, total:%d", pParentSql->self,
pSql->self, trsupport->subqueryIndex, pState->numOfSub);
tscFreeRetrieveSup(pSql);
tscFreeRetrieveSup(&pSql->param);
return;
}
......@@ -2849,7 +2870,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
// release allocated resource
tscDestroyGlobalMergerEnv(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, pState->numOfSub);
tscFreeRetrieveSup(pSql);
tscFreeRetrieveSup(&pSql->param);
// in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pParentSql->cmd);
......@@ -2861,7 +2882,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && userSql->retry < userSql->maxRetry) {
if (userSql != pParentSql) {
tscFreeRetrieveSup(pParentSql);
(*pParentSql->freeParam)(&pParentSql->param);
}
tscFreeSubobj(userSql);
......@@ -2945,7 +2966,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
tscDebug("0x%"PRIx64" sub:0x%"PRIx64" orderOfSub:%d freed, not finished", pParentSql->self, pSql->self,
trsupport->subqueryIndex);
tscFreeRetrieveSup(pSql);
tscFreeRetrieveSup(&pSql->param);
return;
}
......@@ -2975,7 +2996,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
pParentSql->res.row = 0;
pParentSql->res.numOfGroups = 0;
tscFreeRetrieveSup(pSql);
tscFreeRetrieveSup(&pSql->param);
// set the command flag must be after the semaphore been correctly set.
if (pParentSql->cmd.command != TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
......
......@@ -3798,7 +3798,7 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) {
int32_t index = ps->subqueryIndex;
bool ret = subAndCheckDone(pSql, pParentSql, index);
tscFreeRetrieveSup(pSql);
tscFreeRetrieveSup(&pSql->param);
if (!ret) {
tscDebug("0x%"PRIx64" sub:0x%"PRIx64" orderOfSub:%d completed, not all subquery finished", pParentSql->self, pSql->self, index);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册