diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index c32ec5625b16beea988e3db026c584a29663f27c..65854905bdd12ce7452eddada0b051ff7d1cdcbe 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -361,6 +361,7 @@ typedef struct SSubqueryState { int8_t *states; int32_t numOfSub; // the number of total sub-queries uint64_t numOfRetrievedRows; // total number of points in this query + uint32_t version; } SSubqueryState; typedef struct SSqlObj { @@ -440,8 +441,10 @@ typedef struct SSqlStream { } SSqlStream; SSqlObj* tscAllocSqlObj(); -SSqlObj* tscAcquireRefOfSubobj(SSqlObj *pSql, int32_t idx); +uint32_t tscGetVersionOfSubStateWithoutLock(SSqlObj *pSql); +SSqlObj* tscAcquireRefOfSubobj(SSqlObj *pSql, int32_t idx, uint32_t stateVersion); void tscReleaseRefOfSubobj(SSqlObj *pSql); +void tscResetAllSubStates(SSqlObj* pSql); void tscSetStreamDestTable(SSqlStream* pStream, const char* dstTable); diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index f07e4c4ea520cb8bdd9dce430df3608e303df628..d0a403f6435b378882e8d5198ec640b90ceede5c 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -538,6 +538,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { int32_t numOfSub = 0; SJoinSupporter* pSupporter = NULL; bool success = true; + uint32_t stateVersion = 0; { pthread_mutex_lock(&pSql->subState.mutex); @@ -688,6 +689,8 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { numOfCols, pQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pTableMetaInfo->name)); } + stateVersion = tscGetVersionOfSubStateWithoutLock(pSql); + pthread_mutex_unlock(&pSql->subState.mutex); } //prepare the subqueries object failed, abort @@ -695,23 +698,29 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { pSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY; tscError("0x%"PRIx64" failed to prepare subqueries objs for secondary phase query, numOfSub:%d, code:%d", pSql->self, pSql->subState.numOfSub, pSql->res.code); - freeJoinSubqueryObj(pSql); - - return pSql->res.code; + goto _error; } for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { - SSqlObj *pSub = tscAcquireRefOfSubobj(pSql, i); // ACQ ref + SSqlObj *pSub = tscAcquireRefOfSubobj(pSql, i, stateVersion); // ACQ ref if (pSub == NULL) { - continue; + if (stateVersion == tscGetVersionOfSubStateWithoutLock(pSql)) { + continue; + } + pSql->res.code = TSDB_CODE_FAILED; + tscError("0x%"PRIx64"subqueries objs reset unexpectedly. numOfSub:%d", pSql->self, pSql->subState.numOfSub); + goto _error; } - SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSub->cmd); executeQuery(pSub, pQueryInfo); tscReleaseRefOfSubobj(pSub); // REL ref } return TSDB_CODE_SUCCESS; + +_error: + freeJoinSubqueryObj(pSql); + return pSql->res.code; } void freeJoinSubqueryObj(SSqlObj* pSql) { @@ -719,6 +728,7 @@ void freeJoinSubqueryObj(SSqlObj* pSql) { if (pSql->subState.numOfSub == 0) { goto _out; } + pSql->subState.version ++; for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj* pSub = pSql->pSubs[i]; @@ -1414,9 +1424,17 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow (*pParentSql->fp)(pParentSql->param, pParentSql, 0); } else { + uint32_t stateVersion = tscGetVersionOfSubStateWithoutLock(pParentSql); + for (int32_t m = 0; m < pParentSql->subState.numOfSub; ++m) { - SSqlObj *psub = tscAcquireRefOfSubobj(pParentSql, m); // ACQ ref - if (!psub) continue; + SSqlObj *psub = tscAcquireRefOfSubobj(pParentSql, m, stateVersion); // ACQ ref + if (!psub) { + if (stateVersion == tscGetVersionOfSubStateWithoutLock(pParentSql)) { + continue; + } + tscError("0x%"PRIx64"subqueries objs reset unexpectedly. numOfSub:%d", pSql->self, pSql->subState.numOfSub); + break; + } // proceed to for ts_comp query SSqlCmd* pSubCmd = &psub->cmd; @@ -1428,9 +1446,9 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow ((SJoinSupporter*)psub->param)->pVgroupTables = tscVgroupTableInfoDup(pTableMetaInfo->pVgroupTables); - memset(pParentSql->subState.states, 0, sizeof(pParentSql->subState.states[0]) * pParentSql->subState.numOfSub); + tscResetAllSubStates(pParentSql); tscDebug("0x%"PRIx64" reset all sub states to 0", pParentSql->self); - + issueTsCompQuery(psub, psub->param, pParentSql); tscReleaseRefOfSubobj(psub); // REL ref @@ -1792,8 +1810,8 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { if (numOfFetch <= 0) { bool tryNextVnode = false; - bool orderedPrjQuery = false; + uint32_t stateVersion = 0; { pthread_mutex_lock(&pSql->subState.mutex); @@ -1819,12 +1837,19 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { } } + stateVersion = tscGetVersionOfSubStateWithoutLock(pSql); + pthread_mutex_unlock(&pSql->subState.mutex); } for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { - SSqlObj* pSub = tscAcquireRefOfSubobj(pSql, i); // ACQ ref + SSqlObj* pSub = tscAcquireRefOfSubobj(pSql, i, stateVersion); // ACQ ref if (pSub == NULL) { - continue; + if (stateVersion == tscGetVersionOfSubStateWithoutLock(pSql)) { + continue; + } + tscError("0x%"PRIx64"subqueries objs reset unexpectedly. numOfSub:%d", pSql->self, pSql->subState.numOfSub); + pSql->res.code = TSDB_CODE_FAILED; + break; } SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSub->cmd); @@ -1879,7 +1904,10 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { tscDebug("0x%"PRIx64" retrieve data from %d subqueries", pSql->self, numOfFetch); SJoinSupporter* pSupporter = NULL; + uint32_t stateVersion = 0; + { pthread_mutex_lock(&pSql->subState.mutex); + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj* pSql1 = pSql->pSubs[i]; if (pSql1 == NULL) { @@ -1891,12 +1919,19 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { subquerySetStateWithoutLock(pSql1, &pSql->subState, i, 0); } } + + stateVersion = tscGetVersionOfSubStateWithoutLock(pSql); + pthread_mutex_unlock(&pSql->subState.mutex); } for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { - SSqlObj* pSql1 = tscAcquireRefOfSubobj(pSql, i); // ACQ ref + SSqlObj* pSql1 = tscAcquireRefOfSubobj(pSql, i, stateVersion); // ACQ ref if (pSql1 == NULL) { - continue; + if (stateVersion == tscGetVersionOfSubStateWithoutLock(pSql)) { + continue; + } + tscError("0x%"PRIx64"subqueries objs reset unexpectedly. numOfSub:%d", pSql->self, pSql->subState.numOfSub); + break; } SSqlRes* pRes1 = &pSql1->res; @@ -2228,6 +2263,7 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { goto _error; } + uint32_t stateVersion = 0; int errflag = 0; { pthread_mutex_lock(&pSql->subState.mutex); @@ -2256,8 +2292,12 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { } } + stateVersion = tscGetVersionOfSubStateWithoutLock(pSql); + pthread_mutex_unlock(&pSql->subState.mutex); } - if (errflag) goto _error; + if (errflag) { + goto _error; + } if (pSql->cmd.command == TSDB_SQL_RETRIEVE_EMPTY_RESULT) { // at least one subquery is empty, do nothing and return freeJoinSubqueryObj(pSql); @@ -2265,8 +2305,14 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { } else { int fail = 0; for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { - SSqlObj* pSub = tscAcquireRefOfSubobj(pSql, i); // ACQ ref - if (!pSub) continue; + SSqlObj* pSub = tscAcquireRefOfSubobj(pSql, i, stateVersion); // ACQ ref + if (!pSub) { + if (stateVersion == tscGetVersionOfSubStateWithoutLock(pSql)) { + continue; + } + code = TSDB_CODE_FAILED; + goto _error; + } if (fail) { (*pSub->fp)(pSub->param, pSub, 0); continue; @@ -2289,7 +2335,7 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { return; - _error: +_error: pRes->code = code; tscAsyncResultOnError(pSql); } @@ -2727,14 +2773,14 @@ typedef struct SPair { static void doSendQueryReqs(SSchedMsg* pSchedMsg) { SSqlObj* pSql = pSchedMsg->ahandle; SPair* p = pSchedMsg->msg; + uint32_t stateVersion = tscGetVersionOfSubStateWithoutLock(pSql); for (int32_t i = p->first; i < p->second; ++i) { - if (i >= pSql->subState.numOfSub) { - tfree(p); - return; + SSqlObj* pSub = tscAcquireRefOfSubobj(pSql, i, stateVersion); // ACQ ref + if (!pSub) { + tscError("0x%"PRIx64"subqueries objs reset unexpectedly. numOfSub:%d", pSql->self, pSql->subState.numOfSub); + break; } - SSqlObj* pSub = tscAcquireRefOfSubobj(pSql, i); // ACQ ref - if (!pSub) continue; SRetrieveSupport* pSupport = pSub->param; tscDebug("0x%"PRIx64" sub:0x%"PRIx64" launch subquery, orderOfSub:%d.", pSql->self, pSub->self, pSupport->subqueryIndex); @@ -3568,10 +3614,12 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { return pRes->code; } + uint32_t stateVersion = tscGetVersionOfSubStateWithoutLock(pSql); + for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { - SSqlObj* pSub = tscAcquireRefOfSubobj(pSql, i); // ACQ ref + SSqlObj* pSub = tscAcquireRefOfSubobj(pSql, i, stateVersion); // ACQ ref if (!pSub) { - tscError("0x%"PRIx64" the %d'th one of (num:%d) sub queries is null.", pSql->self, i, pSql->subState.numOfSub); + tscError("0x%"PRIx64"subqueries objs reset unexpectedly. numOfSub:%d", pSql->self, pSql->subState.numOfSub); pRes->code = TSDB_CODE_FAILED; return pRes->code; } @@ -3597,6 +3645,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { goto _error; } + uint32_t stateVersion = 0; int32_t numOfSub = 0; int errflag = 0; @@ -3649,15 +3698,25 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { errflag = 1; } + stateVersion = tscGetVersionOfSubStateWithoutLock(pSql); + pthread_mutex_unlock(&pSql->subState.mutex); } - if (errflag) { goto _error; } + if (errflag) { + goto _error; + } pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pSql, pCmd->insertParam.pDataBlocks); // use the local variable for (int32_t j = 0; j < pSql->subState.numOfSub; ++j) { - SSqlObj *pSub = tscAcquireRefOfSubobj(pSql, j); // ACQ ref - if (!pSub) continue; + SSqlObj *pSub = tscAcquireRefOfSubobj(pSql, j, stateVersion); // ACQ ref + if (!pSub) { + if (stateVersion == tscGetVersionOfSubStateWithoutLock(pSql)) { + continue; + } + tscError("0x%"PRIx64"subqueries objs reset unexpectedly. numOfSub:%d", pSql->self, pSql->subState.numOfSub); + return TSDB_CODE_FAILED; + } tscDebug("0x%"PRIx64" sub:%p launch sub insert, orderOfSub:%d", pSql->self, pSub, j); tscBuildAndSendRequest(pSub, NULL); tscReleaseRefOfSubobj(pSub); // REL ref diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 97222d86c91edfd96419bbf02e89293bc777d012..02459555bf0f6d33d812beaa17f1b8d080f09f69 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1682,6 +1682,7 @@ void tscFreeSubobj(SSqlObj* pSql) { if (pSql->subState.numOfSub == 0) { goto _out; } + pSql->subState.version ++; tscDebug("0x%"PRIx64" start to free sub SqlObj, numOfSub:%d", pSql->self, pSql->subState.numOfSub); @@ -1749,12 +1750,13 @@ SSqlObj* tscAllocSqlObj() { return pNew; } -SSqlObj* tscAcquireRefOfSubobj(SSqlObj* pSql, int32_t idx) { +SSqlObj* tscAcquireRefOfSubobj(SSqlObj* pSql, int32_t idx, uint32_t stateVersion) { assert (pSql != NULL); SSqlObj *pSub = NULL; - pthread_mutex_lock(&pSql->subState.mutex); - if (idx < 0 || + { pthread_mutex_lock(&pSql->subState.mutex); + if (stateVersion != tscGetVersionOfSubStateWithoutLock(pSql) || + idx < 0 || idx >= pSql->subState.numOfSub || !pSql->pSubs[idx]) { goto _out; @@ -1763,7 +1765,7 @@ SSqlObj* tscAcquireRefOfSubobj(SSqlObj* pSql, int32_t idx) { assert (pSql->pSubs[idx] == pSub && "Refcounted subquery obj mismatch"); _out: - pthread_mutex_unlock(&pSql->subState.mutex); + pthread_mutex_unlock(&pSql->subState.mutex); } return pSub; } @@ -1772,6 +1774,16 @@ void tscReleaseRefOfSubobj(SSqlObj* pSub) { taosReleaseRef(tscObjRef, pSub->self); } +uint32_t tscGetVersionOfSubStateWithoutLock(SSqlObj *pSql) { + return pSql->subState.version; +} + +void tscResetAllSubStates(SSqlObj* pSql) { + pthread_mutex_lock(&pSql->subState.mutex); + memset(pSql->subState.states, 0, sizeof(pSql->subState.states[0]) * pSql->subState.numOfSub); + pthread_mutex_unlock(&pSql->subState.mutex); +} + void tscFreeSqlObj(SSqlObj* pSql) { if (pSql == NULL || pSql->signature != pSql) { return; @@ -4240,6 +4252,7 @@ int32_t doReInitSubState(SSqlObj* pSql, int32_t numOfSubqueries) { code = TSDB_CODE_TSC_OUT_OF_MEMORY; } pSql->subState.numOfSub = numOfSubqueries; + pSql->subState.version ++; pthread_mutex_unlock(&pSql->subState.mutex); } return code; @@ -4272,6 +4285,7 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { goto _error; } + uint32_t stateVersion = 0; int errflag = 0; { pthread_mutex_lock(&pSql->subState.mutex); @@ -4327,12 +4341,23 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { numOfInit++; } + stateVersion = tscGetVersionOfSubStateWithoutLock(pSql); + pthread_mutex_unlock(&pSql->subState.mutex); } - if (errflag) { goto _error; } + if (errflag) { + goto _error; + } for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { - SSqlObj* psub = tscAcquireRefOfSubobj(pSql, i); // ACQ REF - if (!psub) continue; + SSqlObj* psub = tscAcquireRefOfSubobj(pSql, i, stateVersion); // ACQ ref + if (!psub) { + if (stateVersion == tscGetVersionOfSubStateWithoutLock(pSql)) { + continue; + } + tscError("0x%"PRIx64"subqueries objs reset unexpectedly. numOfSub:%d", pSql->self, pSql->subState.numOfSub); + code = TSDB_CODE_FAILED; + goto _error; + } // create sub query to handle the sub query. SQueryInfo* pq = tscGetQueryInfo(&psub->cmd); @@ -4343,7 +4368,7 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { } executeQuery(psub, pq); - tscReleaseRefOfSubobj(psub); // REL REF + tscReleaseRefOfSubobj(psub); // REL ref } return;