diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index 16f208da989b5d7d2f0592f38d95de80797af431..d2f74bdd5923ce0d3aec9afab68bcf13a092ccb8 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -742,6 +742,7 @@ void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDe int32_t numOfVnodes) { destroyColumnModel(pFinalModel); tOrderDescDestroy(pDesc); + for (int32_t i = 0; i < numOfVnodes; ++i) { pMemBuffer[i] = destoryExtMemBuffer(pMemBuffer[i]); } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 5cddaa1c4d1541a1f103673fe92d481eec2f0cc0..1161bae424485ad67cb593c21706f3d69a2f6252 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -475,6 +475,7 @@ void tscKillSTableQuery(SSqlObj *pSql) { return; } + // set the master sqlObj flag to cancel query pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; for (int i = 0; i < pSql->subState.numOfSub; ++i) { @@ -498,7 +499,7 @@ void tscKillSTableQuery(SSqlObj *pSql) { pSubObj->pRpcCtx = NULL; } - tscQueueAsyncRes(pSubObj); // async res? not other functions? +// tscQueueAsyncRes(pSubObj); // async res? not other functions? taosCacheRelease(tscObjCache, (void**) &p, false); } diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index abfe62c72cc4e5df3be0770aad0cde61071895b8..185d6784d1ca3457016ed8fa2c80ff63daf2b8a3 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -1491,9 +1491,16 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { return TSDB_CODE_SUCCESS; } -static void tscFreeSubSqlObj(SRetrieveSupport *trsupport, SSqlObj *pSql) { - tscDebug("%p start to free subquery obj", pSql); +static void tscFreeRetrieveSup(SSqlObj *pSql) { + SRetrieveSupport *trsupport = pSql->param; + void* p = atomic_val_compare_exchange_ptr(&pSql->param, trsupport, 0); + if (p == NULL) { + tscDebug("%p retrieve supp already released", pSql); + return; + } + + tscDebug("%p start to free subquery supp obj:%p", pSql, trsupport); // int32_t index = trsupport->subqueryIndex; // SSqlObj *pParentSql = trsupport->pParentSql; @@ -1560,13 +1567,9 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO int32_t subqueryIndex = trsupport->subqueryIndex; assert(pSql != NULL); - SSubqueryState* pState = &pParentSql->subState; - int32_t remain = pState->numOfRemain; - int32_t sub = pState->numOfSub; - UNUSED(remain); - UNUSED(sub); - assert(pParentSql->subState.numOfRemain <= pState->numOfSub && pParentSql->subState.numOfRemain >= 0); + SSubqueryState* pState = &pParentSql->subState; + assert(pState->numOfRemain <= pState->numOfSub && pState->numOfRemain >= 0); // retrieved in subquery failed. OR query cancelled in retrieve phase. if (taos_errno(pSql) == TSDB_CODE_SUCCESS && pParentSql->res.code != TSDB_CODE_SUCCESS) { @@ -1597,12 +1600,12 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO } } - remain = -1; - if ((remain = atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1)) > 0) { + int32_t remain = -1; + if ((remain = atomic_sub_fetch_32(&pState->numOfRemain, 1)) > 0) { tscDebug("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pParentSql, pSql, trsupport->subqueryIndex, pState->numOfSub - remain); - tscFreeSubSqlObj(trsupport, pSql); + tscFreeRetrieveSup(pSql); return; } @@ -1614,7 +1617,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, trsupport->pFinalColModel, pState->numOfSub); - tscFreeSubSqlObj(trsupport, pSql); + tscFreeRetrieveSup(pSql); // in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, 0); @@ -1674,7 +1677,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p tscDebug("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pParentSql, pSql, trsupport->subqueryIndex, pState->numOfSub - remain); - tscFreeSubSqlObj(trsupport, pSql); + tscFreeRetrieveSup(pSql); return; } @@ -1694,7 +1697,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p pParentSql->res.numOfRows = 0; pParentSql->res.row = 0; - tscFreeSubSqlObj(trsupport, pSql); + tscFreeRetrieveSup(pSql); // set the command flag must be after the semaphore been correctly set. pParentSql->cmd.command = TSDB_SQL_RETRIEVE_LOCALMERGE; @@ -1711,8 +1714,8 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR int32_t idx = trsupport->subqueryIndex; SSqlObj * pParentSql = trsupport->pParentSql; - assert(tres != NULL); SSqlObj *pSql = (SSqlObj *)tres; + assert(pSql != NULL && trsupport == pSql->param); SSubqueryState* pState = &pParentSql->subState; assert(pState->numOfRemain <= pState->numOfSub && pState->numOfRemain >= 0);