提交 777b4b06 编写于 作者: H hjxilinx

refactor and fix memory leaks

上级 114dd6ea
...@@ -27,7 +27,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql); ...@@ -27,7 +27,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql);
void tscGetQualifiedTSList(SSqlObj* pSql, SJoinSubquerySupporter* p1, SJoinSubquerySupporter* p2, int32_t* num); void tscGetQualifiedTSList(SSqlObj* pSql, SJoinSubquerySupporter* p1, SJoinSubquerySupporter* p2, int32_t* num);
void tscSetupOutputColumnIndex(SSqlObj* pSql); void tscSetupOutputColumnIndex(SSqlObj* pSql);
int32_t tscLaunchSecondSubquery(SSqlObj* pSql); int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql);
void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code); void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code);
SJoinSubquerySupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, int32_t index); SJoinSubquerySupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, int32_t index);
......
...@@ -22,20 +22,7 @@ ...@@ -22,20 +22,7 @@
#include "ttime.h" #include "ttime.h"
#include "tutil.h" #include "tutil.h"
static UNUSED_FUNC bool isSubqueryCompleted(SSqlObj* pSql) { static void freeSubqueryObj(SSqlObj* pSql);
bool hasData = true;
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
SSqlRes* pRes = &pSql->pSubs[i]->res;
// in case inner join, if any subquery exhausted, query completed
if (pRes->numOfRows == 0) {
hasData = false;
break;
}
}
return hasData;
}
static bool doCompare(int32_t order, int64_t left, int64_t right) { static bool doCompare(int32_t order, int64_t left, int64_t right) {
if (order == TSQL_SO_ASC) { if (order == TSQL_SO_ASC) {
...@@ -235,7 +222,7 @@ bool needSecondaryQuery(SQueryInfo* pQueryInfo) { ...@@ -235,7 +222,7 @@ bool needSecondaryQuery(SQueryInfo* pQueryInfo) {
/* /*
* launch secondary stage query to fetch the result that contains timestamp in set * launch secondary stage query to fetch the result that contains timestamp in set
*/ */
int32_t tscLaunchSecondSubquery(SSqlObj* pSql) { int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
int32_t numOfSub = 0; int32_t numOfSub = 0;
SJoinSubquerySupporter* pSupporter = NULL; SJoinSubquerySupporter* pSupporter = NULL;
...@@ -265,55 +252,56 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) { ...@@ -265,55 +252,56 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) {
pState->numOfTotal = pSql->numOfSubs; pState->numOfTotal = pSql->numOfSubs;
pState->numOfCompleted = (pSql->numOfSubs - numOfSub); pState->numOfCompleted = (pSql->numOfSubs - numOfSub);
bool success = true;
for (int32_t i = 0; i < pSql->numOfSubs; ++i) { for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
SSqlObj* pPrevSub = pSql->pSubs[i]; SSqlObj *pPrevSub = pSql->pSubs[i];
pSql->pSubs[i] = NULL;
pSupporter = pPrevSub->param; pSupporter = pPrevSub->param;
if (pSupporter->exprsInfo.numOfExprs == 0) { if (pSupporter->exprsInfo.numOfExprs == 0) {
tscTrace("%p subquery %d, not need to launch query, ignore it", pSql, i); tscTrace("%p subIndex: %d, not need to launch query, ignore it", pSql, i);
tscDestroyJoinSupporter(pSupporter); tscDestroyJoinSupporter(pSupporter);
tscFreeSqlObj(pPrevSub); tscFreeSqlObj(pPrevSub);
pSql->pSubs[i] = NULL; pSql->pSubs[i] = NULL;
continue; continue;
} }
SQueryInfo* pSubQueryInfo = tscGetQueryInfoDetail(&pPrevSub->cmd, 0); SQueryInfo *pSubQueryInfo = tscGetQueryInfoDetail(&pPrevSub->cmd, 0);
STSBuf* pTSBuf = pSubQueryInfo->tsBuf; STSBuf *pTSBuf = pSubQueryInfo->tsBuf;
pSubQueryInfo->tsBuf = NULL; pSubQueryInfo->tsBuf = NULL;
// free result for async object will also free sqlObj
taos_free_result(pPrevSub); taos_free_result(pPrevSub);
// todo refactor to avoid the memory problem handling SSqlObj *pNew = createSubqueryObj(pSql, (int16_t) i, tscJoinQueryCallback, pSupporter, NULL);
SSqlObj* pNew = createSubqueryObj(pSql, (int16_t)i, tscJoinQueryCallback, pSupporter, NULL);
if (pNew == NULL) { if (pNew == NULL) {
pSql->numOfSubs = i; // revise the number of subquery
pSupporter->pState->numOfTotal = i;
pSupporter->pState->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
tscDestroyJoinSupporter(pSupporter); tscDestroyJoinSupporter(pSupporter);
return 0; success = false;
break;
} }
tscClearSubqueryInfo(&pNew->cmd); tscClearSubqueryInfo(&pNew->cmd);
pSql->pSubs[i] = pNew; pSql->pSubs[i] = pNew;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
pQueryInfo->tsBuf = pTSBuf; // transfer the ownership of timestamp comp-z data to the new created object pQueryInfo->tsBuf = pTSBuf; // transfer the ownership of timestamp comp-z data to the new created object
// set the second stage sub query for join process // set the second stage sub query for join process
pQueryInfo->type |= TSDB_QUERY_TYPE_JOIN_SEC_STAGE; pQueryInfo->type |= TSDB_QUERY_TYPE_JOIN_SEC_STAGE;
pQueryInfo->nAggTimeInterval = pSupporter->interval; pQueryInfo->nAggTimeInterval = pSupporter->interval;
pQueryInfo->groupbyExpr = pSupporter->groupbyExpr; pQueryInfo->groupbyExpr = pSupporter->groupbyExpr;
tscColumnBaseInfoCopy(&pQueryInfo->colList, &pSupporter->colList, 0); tscColumnBaseInfoCopy(&pQueryInfo->colList, &pSupporter->colList, 0);
tscTagCondCopy(&pQueryInfo->tagCond, &pSupporter->tagCond); tscTagCondCopy(&pQueryInfo->tagCond, &pSupporter->tagCond);
tscSqlExprCopy(&pQueryInfo->exprsInfo, &pSupporter->exprsInfo, pSupporter->uid); tscSqlExprCopy(&pQueryInfo->exprsInfo, &pSupporter->exprsInfo, pSupporter->uid);
tscFieldInfoCopyAll(&pQueryInfo->fieldsInfo, &pSupporter->fieldsInfo); tscFieldInfoCopyAll(&pQueryInfo->fieldsInfo, &pSupporter->fieldsInfo);
/* /*
* if the first column of the secondary query is not ts function, add this function. * if the first column of the secondary query is not ts function, add this function.
* Because this column is required to filter with timestamp after intersecting. * Because this column is required to filter with timestamp after intersecting.
...@@ -321,43 +309,60 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) { ...@@ -321,43 +309,60 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) {
if (pSupporter->exprsInfo.pExprs[0].functionId != TSDB_FUNC_TS) { if (pSupporter->exprsInfo.pExprs[0].functionId != TSDB_FUNC_TS) {
tscAddTimestampColumn(pQueryInfo, TSDB_FUNC_TS, 0); tscAddTimestampColumn(pQueryInfo, TSDB_FUNC_TS, 0);
} }
// todo refactor function name // todo refactor function name
SQueryInfo* pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
assert(pNew->numOfSubs == 0 && pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1); assert(pNew->numOfSubs == 0 && pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
tscFieldInfoCalOffset(pNewQueryInfo); tscFieldInfoCalOffset(pNewQueryInfo);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pNewQueryInfo, 0); SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pNewQueryInfo, 0);
/* /*
* When handling the projection query, the offset value will be modified for table-table join, which is changed * When handling the projection query, the offset value will be modified for table-table join, which is changed
* during the timestamp intersection. * during the timestamp intersection.
*/ */
pSupporter->limit = pQueryInfo->limit; pSupporter->limit = pQueryInfo->limit;
pNewQueryInfo->limit = pSupporter->limit; pNewQueryInfo->limit = pSupporter->limit;
// fetch the join tag column // fetch the join tag column
if (UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo)) { if (UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo)) {
SSqlExpr* pExpr = tscSqlExprGet(pNewQueryInfo, 0); SSqlExpr *pExpr = tscSqlExprGet(pNewQueryInfo, 0);
assert(pQueryInfo->tagCond.joinInfo.hasJoin); assert(pQueryInfo->tagCond.joinInfo.hasJoin);
int16_t tagColIndex = tscGetJoinTagColIndexByUid(&pQueryInfo->tagCond, pMeterMetaInfo->pMeterMeta->uid); int16_t tagColIndex = tscGetJoinTagColIndexByUid(&pQueryInfo->tagCond, pMeterMetaInfo->pMeterMeta->uid);
pExpr->param[0].i64Key = tagColIndex; pExpr->param[0].i64Key = tagColIndex;
pExpr->numOfParams = 1; pExpr->numOfParams = 1;
} }
tscPrintSelectClause(pNew, 0); tscPrintSelectClause(pNew, 0);
tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s", tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s",
pSql, pNew, 0, pMeterMetaInfo->vnodeIndex, pNewQueryInfo->type, pSql, pNew, 0, pMeterMetaInfo->vnodeIndex, pNewQueryInfo->type,
pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols, pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols,
pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pMeterInfo[0]->name); pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pMeterInfo[0]->name);
}
//prepare the subqueries object failed, abort
if (!success) {
pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;
tscError("%p failed to prepare subqueries objs for secondary phase query, numOfSub:%d, code:%d", pSql,
pSql->numOfSubs, pSql->res.code);
freeSubqueryObj(pSql);
return pSql->res.code;
}
for(int32_t i = 0; i < pSql->numOfSubs; ++i) {
SSqlObj* pSub = pSql->pSubs[i];
if (pSub == NULL) {
continue;
}
tscProcessSql(pNew); tscProcessSql(pSub);
} }
return 0; return TSDB_CODE_SUCCESS;
} }
static void freeSubqueryObj(SSqlObj* pSql) { static void freeSubqueryObj(SSqlObj* pSql) {
...@@ -506,7 +511,7 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { ...@@ -506,7 +511,7 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
doQuitSubquery(pParentSql); doQuitSubquery(pParentSql);
} else { } else {
updateQueryTimeRange(pParentQueryInfo, st, et); updateQueryTimeRange(pParentQueryInfo, st, et);
tscLaunchSecondSubquery(pParentSql); tscLaunchSecondPhaseSubqueries(pParentSql);
} }
} }
} else { // failure of sub query } else { // failure of sub query
...@@ -549,7 +554,7 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { ...@@ -549,7 +554,7 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
if (finished >= numOfTotal) { if (finished >= numOfTotal) {
assert(finished == numOfTotal); assert(finished == numOfTotal);
tscTrace("%p all %d secondary retrieves are completed, global code:%d", tres, pSupporter->pState->numOfTotal, tscTrace("%p all %d secondary subquery retrieves completed, global code:%d", tres, numOfTotal,
pParentSql->res.code); pParentSql->res.code);
if (pSupporter->pState->code != TSDB_CODE_SUCCESS) { if (pSupporter->pState->code != TSDB_CODE_SUCCESS) {
...@@ -558,6 +563,8 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { ...@@ -558,6 +563,8 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
} }
tsem_post(&pParentSql->rspSem); tsem_post(&pParentSql->rspSem);
} else {
tscTrace("%p sub:%p completed, completed:%d, total:%d", pParentSql, tres, finished, numOfTotal);
} }
} }
} }
...@@ -730,7 +737,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { ...@@ -730,7 +737,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
// // no qualified result // // no qualified result
// } // }
// //
// tscLaunchSecondSubquery(pSql, ts, num); // tscLaunchSecondPhaseSubqueries(pSql, ts, num);
// } else { // } else {
// } // }
...@@ -770,7 +777,6 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { ...@@ -770,7 +777,6 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
assert(finished == numOfTotal); assert(finished == numOfTotal);
tscSetupOutputColumnIndex(pParentSql); tscSetupOutputColumnIndex(pParentSql);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
/** /**
......
...@@ -452,8 +452,8 @@ void tscDestroyLocalReducer(SSqlObj *pSql) { ...@@ -452,8 +452,8 @@ void tscDestroyLocalReducer(SSqlObj *pSql) {
if (pLocalReducer->pCtx != NULL) { if (pLocalReducer->pCtx != NULL) {
for(int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { for(int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
SQLFunctionCtx *pCtx = &pLocalReducer->pCtx[i]; SQLFunctionCtx *pCtx = &pLocalReducer->pCtx[i];
tVariantDestroy(&pCtx->tag);
tVariantDestroy(&pCtx->tag);
if (pCtx->tagInfo.pTagCtxList != NULL) { if (pCtx->tagInfo.pTagCtxList != NULL) {
tfree(pCtx->tagInfo.pTagCtxList); tfree(pCtx->tagInfo.pTagCtxList);
} }
......
...@@ -445,26 +445,12 @@ void tscFreeSqlObj(SSqlObj* pSql) { ...@@ -445,26 +445,12 @@ void tscFreeSqlObj(SSqlObj* pSql) {
pSql->fp = NULL; pSql->fp = NULL;
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
memset(pCmd->payload, 0, (size_t)pCmd->allocSize); memset(pCmd->payload, 0, (size_t)pCmd->allocSize);
tfree(pCmd->payload); tfree(pCmd->payload);
pCmd->allocSize = 0; pCmd->allocSize = 0;
// if (pRes->buffer != NULL) {
// SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
//
// for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; i++) {
// if (pRes->buffer[i] != NULL) {
// printf("===========free:%p\n", pRes->buffer[i]);
// tfree(pRes->buffer[i]);
// }
// }
//
// tfree(pRes->buffer);
// }
if (pSql->fp == NULL) { if (pSql->fp == NULL) {
tsem_destroy(&pSql->rspSem); tsem_destroy(&pSql->rspSem);
tsem_destroy(&pSql->emptyRspSem); tsem_destroy(&pSql->emptyRspSem);
......
...@@ -751,7 +751,7 @@ int isCommentLine(char *line) { ...@@ -751,7 +751,7 @@ int isCommentLine(char *line) {
void source_file(TAOS *con, char *fptr) { void source_file(TAOS *con, char *fptr) {
wordexp_t full_path; wordexp_t full_path;
int read_len = 0; int read_len = 0;
char * cmd = malloc(MAX_COMMAND_SIZE); char * cmd = calloc(1, MAX_COMMAND_SIZE);
size_t cmd_len = 0; size_t cmd_len = 0;
char * line = NULL; char * line = NULL;
size_t line_len = 0; size_t line_len = 0;
......
...@@ -163,9 +163,8 @@ void tVariantCreateFromBinary(tVariant *pVar, char *pz, uint32_t len, uint32_t t ...@@ -163,9 +163,8 @@ void tVariantCreateFromBinary(tVariant *pVar, char *pz, uint32_t len, uint32_t t
void tVariantDestroy(tVariant *pVar) { void tVariantDestroy(tVariant *pVar) {
if (pVar == NULL) return; if (pVar == NULL) return;
if ((pVar->nType == TSDB_DATA_TYPE_BINARY || pVar->nType == TSDB_DATA_TYPE_NCHAR) && pVar->nLen > 0) { if (pVar->nType == TSDB_DATA_TYPE_BINARY || pVar->nType == TSDB_DATA_TYPE_NCHAR) {
free(pVar->pz); tfree(pVar->pz);
pVar->pz = NULL;
pVar->nLen = 0; pVar->nLen = 0;
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册