From d838bacdbe270d9359cb72292f4466c014dea3d7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 14 Oct 2020 18:48:24 +0800 Subject: [PATCH] [td-1637] --- src/client/src/tscLocalMerge.c | 1 + src/client/src/tscServer.c | 3 ++- src/client/src/tscSubquery.c | 33 ++++++++++++++++++--------------- 3 files changed, 21 insertions(+), 16 deletions(-) diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index 16f208da98..d2f74bdd59 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -742,6 +742,7 @@ void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDe int32_t numOfVnodes) { destroyColumnModel(pFinalModel); tOrderDescDestroy(pDesc); + for (int32_t i = 0; i < numOfVnodes; ++i) { pMemBuffer[i] = destoryExtMemBuffer(pMemBuffer[i]); } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 5cddaa1c4d..1161bae424 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -475,6 +475,7 @@ void tscKillSTableQuery(SSqlObj *pSql) { return; } + // set the master sqlObj flag to cancel query pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; for (int i = 0; i < pSql->subState.numOfSub; ++i) { @@ -498,7 +499,7 @@ void tscKillSTableQuery(SSqlObj *pSql) { pSubObj->pRpcCtx = NULL; } - tscQueueAsyncRes(pSubObj); // async res? not other functions? +// tscQueueAsyncRes(pSubObj); // async res? not other functions? taosCacheRelease(tscObjCache, (void**) &p, false); } diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index abfe62c72c..185d6784d1 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -1491,9 +1491,16 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { return TSDB_CODE_SUCCESS; } -static void tscFreeSubSqlObj(SRetrieveSupport *trsupport, SSqlObj *pSql) { - tscDebug("%p start to free subquery obj", pSql); +static void tscFreeRetrieveSup(SSqlObj *pSql) { + SRetrieveSupport *trsupport = pSql->param; + void* p = atomic_val_compare_exchange_ptr(&pSql->param, trsupport, 0); + if (p == NULL) { + tscDebug("%p retrieve supp already released", pSql); + return; + } + + tscDebug("%p start to free subquery supp obj:%p", pSql, trsupport); // int32_t index = trsupport->subqueryIndex; // SSqlObj *pParentSql = trsupport->pParentSql; @@ -1560,13 +1567,9 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO int32_t subqueryIndex = trsupport->subqueryIndex; assert(pSql != NULL); - SSubqueryState* pState = &pParentSql->subState; - int32_t remain = pState->numOfRemain; - int32_t sub = pState->numOfSub; - UNUSED(remain); - UNUSED(sub); - assert(pParentSql->subState.numOfRemain <= pState->numOfSub && pParentSql->subState.numOfRemain >= 0); + SSubqueryState* pState = &pParentSql->subState; + assert(pState->numOfRemain <= pState->numOfSub && pState->numOfRemain >= 0); // retrieved in subquery failed. OR query cancelled in retrieve phase. if (taos_errno(pSql) == TSDB_CODE_SUCCESS && pParentSql->res.code != TSDB_CODE_SUCCESS) { @@ -1597,12 +1600,12 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO } } - remain = -1; - if ((remain = atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1)) > 0) { + int32_t remain = -1; + if ((remain = atomic_sub_fetch_32(&pState->numOfRemain, 1)) > 0) { tscDebug("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pParentSql, pSql, trsupport->subqueryIndex, pState->numOfSub - remain); - tscFreeSubSqlObj(trsupport, pSql); + tscFreeRetrieveSup(pSql); return; } @@ -1614,7 +1617,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, trsupport->pFinalColModel, pState->numOfSub); - tscFreeSubSqlObj(trsupport, pSql); + tscFreeRetrieveSup(pSql); // in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, 0); @@ -1674,7 +1677,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p tscDebug("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pParentSql, pSql, trsupport->subqueryIndex, pState->numOfSub - remain); - tscFreeSubSqlObj(trsupport, pSql); + tscFreeRetrieveSup(pSql); return; } @@ -1694,7 +1697,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p pParentSql->res.numOfRows = 0; pParentSql->res.row = 0; - tscFreeSubSqlObj(trsupport, pSql); + tscFreeRetrieveSup(pSql); // set the command flag must be after the semaphore been correctly set. pParentSql->cmd.command = TSDB_SQL_RETRIEVE_LOCALMERGE; @@ -1711,8 +1714,8 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR int32_t idx = trsupport->subqueryIndex; SSqlObj * pParentSql = trsupport->pParentSql; - assert(tres != NULL); SSqlObj *pSql = (SSqlObj *)tres; + assert(pSql != NULL && trsupport == pSql->param); SSubqueryState* pState = &pParentSql->subState; assert(pState->numOfRemain <= pState->numOfSub && pState->numOfRemain >= 0); -- GitLab