diff --git a/src/client/inc/tscSubquery.h b/src/client/inc/tscSubquery.h index a012ca5a7fe741b8859465504cbc971a7e46952c..b6f0ec712c9bbd0d48b560a5e72768a021e2b74d 100644 --- a/src/client/inc/tscSubquery.h +++ b/src/client/inc/tscSubquery.h @@ -52,7 +52,7 @@ int tsInsertInitialCheck(SSqlObj *pSql); void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs); -void tscFreeRetrieveSup(SSqlObj *pSql); +void tscFreeRetrieveSup(void **param); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 3ab9e2f66faafc8d4bda0b6993c0070e0ec14ea6..de439e64036999da0c9f6142ace7ebe1b847adbc 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -48,6 +48,8 @@ struct SSqlInfo; typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int32_t numOfRows); +typedef void (*_freeSqlSupporter)(void **); + typedef struct SNewVgroupInfo { int32_t vgId; int8_t inUse; @@ -364,6 +366,7 @@ typedef struct SSqlObj { __async_cb_func_t fp; __async_cb_func_t fetchFp; void *param; + _freeSqlSupporter freeParam; int64_t stime; uint32_t queryId; void * pStream; diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 8e139ebc2625ff283a2ae611e4ae6b9feac932a3..cba337cd1c7048acf76b77c614adecaa75bce0c9 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -2129,7 +2129,7 @@ void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs) { SSqlObj* pSub = pSql->pSubs[i]; assert(pSub != NULL); - tscFreeRetrieveSup(pSub); + tscFreeRetrieveSup(&pSub->param); taos_free_result(pSub); } @@ -2215,10 +2215,13 @@ void doAppendData(SInterResult* pInterResult, TAOS_ROW row, int32_t numOfCols, S } } -static void destroySup(SFirstRoundQuerySup* pSup) { - taosArrayDestroyEx(pSup->pResult, freeInterResult); - taosArrayDestroy(pSup->pColsInfo); - tfree(pSup); +static void tscFreeFirstRoundSup(void **param) { + if (*param) { + SFirstRoundQuerySup* pSup = (SFirstRoundQuerySup*)*param; + taosArrayDestroyEx(pSup->pResult, freeInterResult); + taosArrayDestroy(pSup->pColsInfo); + tfree(*param); + } } void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { @@ -2232,8 +2235,10 @@ void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { int32_t code = taos_errno(pSql); if (code != TSDB_CODE_SUCCESS) { - destroySup(pSup); + tscFreeFirstRoundSup(¶m); taos_free_result(pSql); + pParent->subState.numOfSub = 0; + tfree(pParent->pSubs); pParent->res.code = code; tscAsyncResultOnError(pParent); return; @@ -2325,11 +2330,11 @@ void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { tbufCloseWriter(&bw); } - taosArrayDestroyEx(pSup->pResult, freeInterResult); - taosArrayDestroy(pSup->pColsInfo); - tfree(pSup); + tscFreeFirstRoundSup(¶m); taos_free_result(pSql); + pParent->subState.numOfSub = 0; + tfree(pParent->pSubs); if (resRows == 0) { pParent->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; @@ -2350,8 +2355,10 @@ void tscFirstRoundCallback(void* param, TAOS_RES* tres, int code) { if (c != TSDB_CODE_SUCCESS) { SSqlObj* parent = pSup->pParent; - destroySup(pSup); + tscFreeFirstRoundSup(¶m); taos_free_result(pSql); + parent->subState.numOfSub = 0; + tfree(parent->pSubs); parent->res.code = c; tscAsyncResultOnError(parent); return; @@ -2374,6 +2381,10 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) { SSqlObj *pNew = createSubqueryObj(pSql, 0, tscFirstRoundCallback, pSup, TSDB_SQL_SELECT, NULL); SSqlCmd *pCmd = &pNew->cmd; + pNew->freeParam = tscFreeFirstRoundSup; + + tscDebug("%"PRIx64 " add first round supporter:%p", pNew->self, pSup); + SQueryInfo* pNewQueryInfo = tscGetQueryInfo(pCmd); assert(pQueryInfo->numOfTables == 1); @@ -2502,11 +2513,21 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) { pSql->self, pNew->self, 0, pTableMetaInfo->vgroupIndex, pTableMetaInfo->vgroupList->numOfVgroups, pNewQueryInfo->type, tscNumOfExprs(pNewQueryInfo), index+1, pNewQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pTableMetaInfo->name)); + pSql->pSubs = calloc(1, POINTER_BYTES); + if (pSql->pSubs == NULL) { + terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; + goto _error; + } + + pSql->subState.numOfSub = 1; + + pSql->pSubs[0] = pNew; + tscHandleMasterSTableQuery(pNew); return TSDB_CODE_SUCCESS; _error: - destroySup(pSup); + tscFreeFirstRoundSup((void**)&pSup); taos_free_result(pNew); pSql->res.code = terrno; tscAsyncResultOnError(pSql); @@ -2697,16 +2718,16 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { return TSDB_CODE_SUCCESS; } -void tscFreeRetrieveSup(SSqlObj *pSql) { - SRetrieveSupport *trsupport = pSql->param; +void tscFreeRetrieveSup(void **param) { + SRetrieveSupport *trsupport = *param; - void* p = atomic_val_compare_exchange_ptr(&pSql->param, trsupport, 0); + void* p = atomic_val_compare_exchange_ptr(param, trsupport, 0); if (p == NULL) { - tscDebug("0x%"PRIx64" retrieve supp already released", pSql->self); + tscDebug("retrieve supp already released"); return; } - tscDebug("0x%"PRIx64" start to free subquery supp obj:%p", pSql->self, trsupport); + tscDebug("start to free subquery restrieve supp obj:%p", trsupport); tfree(trsupport->localBuffer); tfree(trsupport); } @@ -2779,12 +2800,12 @@ static int32_t tscReissueSubquery(SRetrieveSupport *oriTrs, SSqlObj *pSql, int32 // if failed to process sql, let following code handle the pSql if (ret == TSDB_CODE_SUCCESS) { - tscFreeRetrieveSup(pSql); + tscFreeRetrieveSup(&pSql->param); taos_free_result(pSql); return ret; } else { pParentSql->pSubs[trsupport->subqueryIndex] = pSql; - tscFreeRetrieveSup(pNew); + tscFreeRetrieveSup(&pNew->param); taos_free_result(pNew); return ret; } @@ -2839,7 +2860,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO tscDebug("0x%"PRIx64" sub:0x%"PRIx64",%d freed, not finished, total:%d", pParentSql->self, pSql->self, trsupport->subqueryIndex, pState->numOfSub); - tscFreeRetrieveSup(pSql); + tscFreeRetrieveSup(&pSql->param); return; } @@ -2849,7 +2870,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO // release allocated resource tscDestroyGlobalMergerEnv(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, pState->numOfSub); - tscFreeRetrieveSup(pSql); + tscFreeRetrieveSup(&pSql->param); // in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes SQueryInfo *pQueryInfo = tscGetQueryInfo(&pParentSql->cmd); @@ -2861,7 +2882,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && userSql->retry < userSql->maxRetry) { if (userSql != pParentSql) { - tscFreeRetrieveSup(pParentSql); + (*pParentSql->freeParam)(&pParentSql->param); } tscFreeSubobj(userSql); @@ -2945,7 +2966,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p tscDebug("0x%"PRIx64" sub:0x%"PRIx64" orderOfSub:%d freed, not finished", pParentSql->self, pSql->self, trsupport->subqueryIndex); - tscFreeRetrieveSup(pSql); + tscFreeRetrieveSup(&pSql->param); return; } @@ -2975,7 +2996,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p pParentSql->res.row = 0; pParentSql->res.numOfGroups = 0; - tscFreeRetrieveSup(pSql); + tscFreeRetrieveSup(&pSql->param); // set the command flag must be after the semaphore been correctly set. if (pParentSql->cmd.command != TSDB_SQL_RETRIEVE_EMPTY_RESULT) { diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index bec12547d9f7925494f6ba111400b02a35966cfb..ac91d845f778bcb5672edbde677b88e8c50e7d33 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -3798,7 +3798,7 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) { int32_t index = ps->subqueryIndex; bool ret = subAndCheckDone(pSql, pParentSql, index); - tscFreeRetrieveSup(pSql); + tscFreeRetrieveSup(&pSql->param); if (!ret) { tscDebug("0x%"PRIx64" sub:0x%"PRIx64" orderOfSub:%d completed, not all subquery finished", pParentSql->self, pSql->self, index);