diff --git a/src/client/inc/tscJoinProcess.h b/src/client/inc/tscJoinProcess.h index 4d21c28af68ca8d1ca1d894123367160a8fdd176..34764e4db62469af14592a026015c88b53a03fa5 100644 --- a/src/client/inc/tscJoinProcess.h +++ b/src/client/inc/tscJoinProcess.h @@ -27,7 +27,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql); void tscGetQualifiedTSList(SSqlObj* pSql, SJoinSubquerySupporter* p1, SJoinSubquerySupporter* p2, int32_t* num); void tscSetupOutputColumnIndex(SSqlObj* pSql); -int32_t tscLaunchSecondSubquery(SSqlObj* pSql); +int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql); void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code); SJoinSubquerySupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, int32_t index); diff --git a/src/client/src/tscJoinProcess.c b/src/client/src/tscJoinProcess.c index 0e93eac0cca99dc1373b504764a3a9658a3104e3..fcf46bb75465b65fb2790d88c2e0d171bfbb8156 100644 --- a/src/client/src/tscJoinProcess.c +++ b/src/client/src/tscJoinProcess.c @@ -22,20 +22,7 @@ #include "ttime.h" #include "tutil.h" -static UNUSED_FUNC bool isSubqueryCompleted(SSqlObj* pSql) { - bool hasData = true; - for (int32_t i = 0; i < pSql->numOfSubs; ++i) { - SSqlRes* pRes = &pSql->pSubs[i]->res; - - // in case inner join, if any subquery exhausted, query completed - if (pRes->numOfRows == 0) { - hasData = false; - break; - } - } - - return hasData; -} +static void freeSubqueryObj(SSqlObj* pSql); static bool doCompare(int32_t order, int64_t left, int64_t right) { if (order == TSQL_SO_ASC) { @@ -235,7 +222,7 @@ bool needSecondaryQuery(SQueryInfo* pQueryInfo) { /* * launch secondary stage query to fetch the result that contains timestamp in set */ -int32_t tscLaunchSecondSubquery(SSqlObj* pSql) { +int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) { int32_t numOfSub = 0; SJoinSubquerySupporter* pSupporter = NULL; @@ -265,55 +252,56 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) { pState->numOfTotal = pSql->numOfSubs; pState->numOfCompleted = (pSql->numOfSubs - numOfSub); + bool success = true; + for (int32_t i = 0; i < pSql->numOfSubs; ++i) { - SSqlObj* pPrevSub = pSql->pSubs[i]; + SSqlObj *pPrevSub = pSql->pSubs[i]; + pSql->pSubs[i] = NULL; + pSupporter = pPrevSub->param; - + if (pSupporter->exprsInfo.numOfExprs == 0) { - tscTrace("%p subquery %d, not need to launch query, ignore it", pSql, i); - + tscTrace("%p subIndex: %d, not need to launch query, ignore it", pSql, i); + tscDestroyJoinSupporter(pSupporter); tscFreeSqlObj(pPrevSub); - + pSql->pSubs[i] = NULL; continue; } - SQueryInfo* pSubQueryInfo = tscGetQueryInfoDetail(&pPrevSub->cmd, 0); - STSBuf* pTSBuf = pSubQueryInfo->tsBuf; + SQueryInfo *pSubQueryInfo = tscGetQueryInfoDetail(&pPrevSub->cmd, 0); + STSBuf *pTSBuf = pSubQueryInfo->tsBuf; pSubQueryInfo->tsBuf = NULL; + // free result for async object will also free sqlObj taos_free_result(pPrevSub); - // todo refactor to avoid the memory problem handling - SSqlObj* pNew = createSubqueryObj(pSql, (int16_t)i, tscJoinQueryCallback, pSupporter, NULL); + SSqlObj *pNew = createSubqueryObj(pSql, (int16_t) i, tscJoinQueryCallback, pSupporter, NULL); if (pNew == NULL) { - pSql->numOfSubs = i; // revise the number of subquery - pSupporter->pState->numOfTotal = i; - - pSupporter->pState->code = TSDB_CODE_CLI_OUT_OF_MEMORY; tscDestroyJoinSupporter(pSupporter); - return 0; + success = false; + break; } - + tscClearSubqueryInfo(&pNew->cmd); pSql->pSubs[i] = pNew; - - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); + + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); pQueryInfo->tsBuf = pTSBuf; // transfer the ownership of timestamp comp-z data to the new created object - + // set the second stage sub query for join process pQueryInfo->type |= TSDB_QUERY_TYPE_JOIN_SEC_STAGE; - + pQueryInfo->nAggTimeInterval = pSupporter->interval; pQueryInfo->groupbyExpr = pSupporter->groupbyExpr; - + tscColumnBaseInfoCopy(&pQueryInfo->colList, &pSupporter->colList, 0); tscTagCondCopy(&pQueryInfo->tagCond, &pSupporter->tagCond); - + tscSqlExprCopy(&pQueryInfo->exprsInfo, &pSupporter->exprsInfo, pSupporter->uid); tscFieldInfoCopyAll(&pQueryInfo->fieldsInfo, &pSupporter->fieldsInfo); - + /* * if the first column of the secondary query is not ts function, add this function. * Because this column is required to filter with timestamp after intersecting. @@ -321,43 +309,60 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) { if (pSupporter->exprsInfo.pExprs[0].functionId != TSDB_FUNC_TS) { tscAddTimestampColumn(pQueryInfo, TSDB_FUNC_TS, 0); } - + // todo refactor function name - SQueryInfo* pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); + SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); assert(pNew->numOfSubs == 0 && pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1); - + tscFieldInfoCalOffset(pNewQueryInfo); - - SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pNewQueryInfo, 0); - + + SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pNewQueryInfo, 0); + /* * When handling the projection query, the offset value will be modified for table-table join, which is changed * during the timestamp intersection. */ pSupporter->limit = pQueryInfo->limit; pNewQueryInfo->limit = pSupporter->limit; - + // fetch the join tag column if (UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo)) { - SSqlExpr* pExpr = tscSqlExprGet(pNewQueryInfo, 0); + SSqlExpr *pExpr = tscSqlExprGet(pNewQueryInfo, 0); assert(pQueryInfo->tagCond.joinInfo.hasJoin); - + int16_t tagColIndex = tscGetJoinTagColIndexByUid(&pQueryInfo->tagCond, pMeterMetaInfo->pMeterMeta->uid); pExpr->param[0].i64Key = tagColIndex; pExpr->numOfParams = 1; } - + tscPrintSelectClause(pNew, 0); tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s", pSql, pNew, 0, pMeterMetaInfo->vnodeIndex, pNewQueryInfo->type, pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols, pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pMeterInfo[0]->name); + } + + //prepare the subqueries object failed, abort + if (!success) { + pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY; + tscError("%p failed to prepare subqueries objs for secondary phase query, numOfSub:%d, code:%d", pSql, + pSql->numOfSubs, pSql->res.code); + freeSubqueryObj(pSql); + + return pSql->res.code; + } + + for(int32_t i = 0; i < pSql->numOfSubs; ++i) { + SSqlObj* pSub = pSql->pSubs[i]; + if (pSub == NULL) { + continue; + } - tscProcessSql(pNew); + tscProcessSql(pSub); } - return 0; + return TSDB_CODE_SUCCESS; } static void freeSubqueryObj(SSqlObj* pSql) { @@ -506,7 +511,7 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { doQuitSubquery(pParentSql); } else { updateQueryTimeRange(pParentQueryInfo, st, et); - tscLaunchSecondSubquery(pParentSql); + tscLaunchSecondPhaseSubqueries(pParentSql); } } } else { // failure of sub query @@ -549,7 +554,7 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { if (finished >= numOfTotal) { assert(finished == numOfTotal); - tscTrace("%p all %d secondary retrieves are completed, global code:%d", tres, pSupporter->pState->numOfTotal, + tscTrace("%p all %d secondary subquery retrieves completed, global code:%d", tres, numOfTotal, pParentSql->res.code); if (pSupporter->pState->code != TSDB_CODE_SUCCESS) { @@ -558,6 +563,8 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { } tsem_post(&pParentSql->rspSem); + } else { + tscTrace("%p sub:%p completed, completed:%d, total:%d", pParentSql, tres, finished, numOfTotal); } } } @@ -730,7 +737,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { // // no qualified result // } // - // tscLaunchSecondSubquery(pSql, ts, num); + // tscLaunchSecondPhaseSubqueries(pSql, ts, num); // } else { // } @@ -770,7 +777,6 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { assert(finished == numOfTotal); tscSetupOutputColumnIndex(pParentSql); - SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); /** diff --git a/src/client/src/tscSecondaryMerge.c b/src/client/src/tscSecondaryMerge.c index 0038f058ddde53839cd78e3624d8676b415e2da3..66416253e1ee63ba3422a3c8c547db2d5a88d8ca 100644 --- a/src/client/src/tscSecondaryMerge.c +++ b/src/client/src/tscSecondaryMerge.c @@ -452,8 +452,8 @@ void tscDestroyLocalReducer(SSqlObj *pSql) { if (pLocalReducer->pCtx != NULL) { for(int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { SQLFunctionCtx *pCtx = &pLocalReducer->pCtx[i]; - tVariantDestroy(&pCtx->tag); + tVariantDestroy(&pCtx->tag); if (pCtx->tagInfo.pTagCtxList != NULL) { tfree(pCtx->tagInfo.pTagCtxList); } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 07cc3169e283af5170d7280fd4b2ef4a0b1bda4a..cf3f30fcd9e5fa9cecdd68335f5d740b84f305d7 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -445,26 +445,12 @@ void tscFreeSqlObj(SSqlObj* pSql) { pSql->fp = NULL; SSqlCmd* pCmd = &pSql->cmd; - SSqlRes* pRes = &pSql->res; memset(pCmd->payload, 0, (size_t)pCmd->allocSize); tfree(pCmd->payload); pCmd->allocSize = 0; -// if (pRes->buffer != NULL) { -// SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); -// -// for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; i++) { -// if (pRes->buffer[i] != NULL) { -// printf("===========free:%p\n", pRes->buffer[i]); -// tfree(pRes->buffer[i]); -// } -// } -// -// tfree(pRes->buffer); -// } - if (pSql->fp == NULL) { tsem_destroy(&pSql->rspSem); tsem_destroy(&pSql->emptyRspSem); diff --git a/src/kit/shell/src/shellEngine.c b/src/kit/shell/src/shellEngine.c index 1ca969a4682f99eb0ca8329eb03bd3c06a8f2438..8d75d3f6c7faca76b39c4f54d201678a91070c00 100644 --- a/src/kit/shell/src/shellEngine.c +++ b/src/kit/shell/src/shellEngine.c @@ -751,7 +751,7 @@ int isCommentLine(char *line) { void source_file(TAOS *con, char *fptr) { wordexp_t full_path; int read_len = 0; - char * cmd = malloc(MAX_COMMAND_SIZE); + char * cmd = calloc(1, MAX_COMMAND_SIZE); size_t cmd_len = 0; char * line = NULL; size_t line_len = 0; diff --git a/src/util/src/ttypes.c b/src/util/src/ttypes.c index b048748d95926a3d566b7dc82aecdd6dcc936eae..0902bb1c0d841b28a6f96cece7443e1026a4cdf2 100644 --- a/src/util/src/ttypes.c +++ b/src/util/src/ttypes.c @@ -163,9 +163,8 @@ void tVariantCreateFromBinary(tVariant *pVar, char *pz, uint32_t len, uint32_t t void tVariantDestroy(tVariant *pVar) { if (pVar == NULL) return; - if ((pVar->nType == TSDB_DATA_TYPE_BINARY || pVar->nType == TSDB_DATA_TYPE_NCHAR) && pVar->nLen > 0) { - free(pVar->pz); - pVar->pz = NULL; + if (pVar->nType == TSDB_DATA_TYPE_BINARY || pVar->nType == TSDB_DATA_TYPE_NCHAR) { + tfree(pVar->pz); pVar->nLen = 0; } }