diff --git a/src/client/inc/tscSubquery.h b/src/client/inc/tscSubquery.h index d3996ccf7fe85a6405addfcbef023aa2ae6bf08b..f45dd858174ce1a8cc5866f7d76de4c669689dfc 100644 --- a/src/client/inc/tscSubquery.h +++ b/src/client/inc/tscSubquery.h @@ -43,6 +43,10 @@ TAOS_ROW doSetResultRowData(SSqlObj *pSql); char *getArithmeticInputSrc(void *param, const char *name, int32_t colId); +void tscSpinLock(int32_t *lock); + +void tscSpinUnlock(int32_t *lock); + #ifdef __cplusplus } #endif diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index a3a086ce779701e8a7c31c3c7fb4e398f95d97ba..27e9b2ced0070414e291264e343a232f0c40e025 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -317,7 +317,8 @@ typedef struct STscObj { } STscObj; typedef struct SSubqueryState { - int32_t numOfRemain; // the number of remain unfinished subquery + int32_t subLock; + int8_t *states; int32_t numOfSub; // the number of total sub-queries uint64_t numOfRetrievedRows; // total number of points in this query } SSubqueryState; diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index d4f962063011910aa242c82146a5a3094da89c05..e39bd05c56543b1e789b52b349063015cf938cc2 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -55,6 +55,72 @@ static void skipRemainValue(STSBuf* pTSBuf, tVariant* tag1) { } } + +void tscSpinLock(int32_t *lock) { + int i = 0; + while (atomic_val_compare_exchange_32(lock, 0, 1) != 0) { + if (++i % 100 == 0) { + sched_yield(); + } + } +} + +void tscSpinUnlock(int32_t *lock) { + if (atomic_val_compare_exchange_32(lock, 1, 0) != 1) { + assert(false); + } +} + + +static void subquerySetState(SSqlObj *pSql, SSubqueryState *subState, int idx, int8_t state) { + assert(idx < subState->numOfSub); + assert(subState->states); + + tscSpinLock(&subState->subLock); + + tscDebug("subquery:%p,%d state set to %d", pSql, idx, state); + + subState->states[idx] = state; + + tscSpinUnlock(&subState->subLock); +} + +static bool allSubqueryDone(SSubqueryState *subState) { + bool done = true; + + //lock in caller + + for (int i = 0; i < subState->numOfSub; i++) { + if (0 == subState->states[i]) { + tscDebug("subquery:%d is NOT finished, total:%d", i, subState->numOfSub); + done = false; + break; + } else { + tscDebug("subquery:%d is finished, total:%d", i, subState->numOfSub); + } + } + + return done; +} + +static bool subAndCheckDone(SSqlObj *pSql, SSubqueryState *subState, int idx) { + assert(idx < subState->numOfSub); + + tscSpinLock(&subState->subLock); + + tscDebug("subquery:%p,%d state set to 1", pSql, idx); + + subState->states[idx] = 1; + + bool done = allSubqueryDone(subState); + + tscSpinUnlock(&subState->subLock); + + return done; +} + + + static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJoinSupporter* pSupporter2, STimeWindow * win) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); @@ -367,10 +433,6 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { // scan all subquery, if one sub query has only ts, ignore it tscDebug("%p start to launch secondary subqueries, %d out of %d needs to query", pSql, numOfSub, pSql->subState.numOfSub); - //the subqueries that do not actually launch the secondary query to virtual node is set as completed. - SSubqueryState* pState = &pSql->subState; - pState->numOfRemain = numOfSub; - bool success = true; for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { @@ -403,6 +465,8 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { success = false; break; } + + subquerySetState(pNew, &pSql->subState, i, 0); tscClearSubqueryInfo(&pNew->cmd); pSql->pSubs[i] = pNew; @@ -517,23 +581,23 @@ void freeJoinSubqueryObj(SSqlObj* pSql) { SJoinSupporter* p = pSub->param; tscDestroyJoinSupporter(p); - if (pSub->res.code == TSDB_CODE_SUCCESS) { - taos_free_result(pSub); - } + taos_free_result(pSub); + pSql->pSubs[i] = NULL; } + tfree(pSql->subState.states); + pSql->subState.numOfSub = 0; } -static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSupporter* pSupporter) { - assert(pSqlObj->subState.numOfRemain > 0); - - if (atomic_sub_fetch_32(&pSqlObj->subState.numOfRemain, 1) <= 0) { - tscError("%p all subquery return and query failed, global code:%s", pSqlObj, tstrerror(pSqlObj->res.code)); +static void quitAllSubquery(SSqlObj* pSqlSub, SSqlObj* pSqlObj, SJoinSupporter* pSupporter) { + if (subAndCheckDone(pSqlSub, &pSqlObj->subState, pSupporter->subqueryIndex)) { + tscError("%p all subquery return and query failed, global code:%s", pSqlObj, tstrerror(pSqlObj->res.code)); freeJoinSubqueryObj(pSqlObj); + return; } - tscDestroyJoinSupporter(pSupporter); + //tscDestroyJoinSupporter(pSupporter); } // update the query time range according to the join results on timestamp @@ -785,7 +849,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow tscError("%p sub query failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex); pParentSql->res.code = numOfRows; - quitAllSubquery(pParentSql, pSupporter); + quitAllSubquery(pSql, pParentSql, pSupporter); tscAsyncResultOnError(pParentSql); return; @@ -802,7 +866,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow tscError("%p failed to malloc memory", pSql); pParentSql->res.code = TAOS_SYSTEM_ERROR(errno); - quitAllSubquery(pParentSql, pSupporter); + quitAllSubquery(pSql, pParentSql, pSupporter); tscAsyncResultOnError(pParentSql); return; @@ -844,9 +908,10 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow // no data exists in next vnode, mark the query completed // only when there is no subquery exits any more, proceeds to get the intersect of the tuple sets. - if (atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1) > 0) { + if (!subAndCheckDone(pSql, &pParentSql->subState, pSupporter->subqueryIndex)) { + tscDebug("%p tagRetrieve:%p,%d completed, total:%d", pParentSql, tres, pSupporter->subqueryIndex, pParentSql->subState.numOfSub); return; - } + } SArray *s1 = NULL, *s2 = NULL; int32_t code = getIntersectionOfTableTuple(pQueryInfo, pParentSql, &s1, &s2); @@ -891,7 +956,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow ((SJoinSupporter*)psub2->param)->pVgroupTables = tscVgroupTableInfoClone(pTableMetaInfo2->pVgroupTables); pParentSql->subState.numOfSub = 2; - pParentSql->subState.numOfRemain = pParentSql->subState.numOfSub; + memset(pParentSql->subState.states, 0, sizeof(pParentSql->subState.states[0]) * pParentSql->subState.numOfSub); for (int32_t m = 0; m < pParentSql->subState.numOfSub; ++m) { SSqlObj* sub = pParentSql->pSubs[m]; @@ -922,7 +987,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow tscError("%p sub query failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex); pParentSql->res.code = numOfRows; - quitAllSubquery(pParentSql, pSupporter); + quitAllSubquery(pSql, pParentSql, pSupporter); tscAsyncResultOnError(pParentSql); return; @@ -937,7 +1002,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow pParentSql->res.code = TAOS_SYSTEM_ERROR(errno); - quitAllSubquery(pParentSql, pSupporter); + quitAllSubquery(pSql, pParentSql, pSupporter); tscAsyncResultOnError(pParentSql); @@ -955,7 +1020,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow pParentSql->res.code = TAOS_SYSTEM_ERROR(errno); - quitAllSubquery(pParentSql, pSupporter); + quitAllSubquery(pSql, pParentSql, pSupporter); tscAsyncResultOnError(pParentSql); @@ -1009,9 +1074,9 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow return; } - if (atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1) > 0) { + if (!subAndCheckDone(pSql, &pParentSql->subState, pSupporter->subqueryIndex)) { return; - } + } tscDebug("%p all subquery retrieve ts complete, do ts block intersect", pParentSql); @@ -1088,9 +1153,8 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR } } - assert(pState->numOfRemain > 0); - if (atomic_sub_fetch_32(&pState->numOfRemain, 1) > 0) { - tscDebug("%p sub:%p completed, remain:%d, total:%d", pParentSql, tres, pState->numOfRemain, pState->numOfSub); + if (!subAndCheckDone(pSql, pState, pSupporter->subqueryIndex)) { + tscDebug("%p sub:%p,%d completed, total:%d", pParentSql, tres, pSupporter->subqueryIndex, pState->numOfSub); return; } @@ -1205,16 +1269,6 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { } } - // get the number of subquery that need to retrieve the next vnode. - if (orderedPrjQuery) { - for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { - SSqlObj* pSub = pSql->pSubs[i]; - if (pSub != NULL && pSub->res.row >= pSub->res.numOfRows && pSub->res.completed) { - pSql->subState.numOfRemain++; - } - } - } - for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj* pSub = pSql->pSubs[i]; if (pSub == NULL) { @@ -1242,9 +1296,13 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { pSub->cmd.command = TSDB_SQL_SELECT; pSub->fp = tscJoinQueryCallback; + subquerySetState(pSub, &pSql->subState, i, 0); + tscProcessSql(pSub); tryNextVnode = true; } else { + subquerySetState(pSub, &pSql->subState, i, 1); + tscDebug("%p no result in current subquery anymore", pSub); } } @@ -1270,7 +1328,19 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { // retrieve data from current vnode. tscDebug("%p retrieve data from %d subqueries", pSql, numOfFetch); SJoinSupporter* pSupporter = NULL; - pSql->subState.numOfRemain = numOfFetch; + + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { + SSqlObj* pSql1 = pSql->pSubs[i]; + if (pSql1 == NULL) { + continue; + } + + SSqlRes* pRes1 = &pSql1->res; + + if (pRes1->row >= pRes1->numOfRows) { + subquerySetState(pSql1, &pSql->subState, i, 0); + } + } for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj* pSql1 = pSql->pSubs[i]; @@ -1370,7 +1440,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { // retrieve actual query results from vnode during the second stage join subquery if (pParentSql->res.code != TSDB_CODE_SUCCESS) { tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, code, pParentSql->res.code); - quitAllSubquery(pParentSql, pSupporter); + quitAllSubquery(pSql, pParentSql, pSupporter); tscAsyncResultOnError(pParentSql); return; @@ -1383,7 +1453,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { tscError("%p abort query, code:%s, global code:%s", pSql, tstrerror(code), tstrerror(pParentSql->res.code)); pParentSql->res.code = code; - quitAllSubquery(pParentSql, pSupporter); + quitAllSubquery(pSql, pParentSql, pSupporter); tscAsyncResultOnError(pParentSql); return; @@ -1410,9 +1480,9 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { // In case of consequence query from other vnode, do not wait for other query response here. if (!(pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0))) { - if (atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1) > 0) { + if (!subAndCheckDone(pSql, &pParentSql->subState, pSupporter->subqueryIndex)) { return; - } + } } tscSetupOutputColumnIndex(pParentSql); @@ -1424,6 +1494,9 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { if (pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { pSql->fp = joinRetrieveFinalResCallback; // continue retrieve data pSql->cmd.command = TSDB_SQL_FETCH; + + subquerySetState(pSql, &pParentSql->subState, pSupporter->subqueryIndex, 0); + tscProcessSql(pSql); } else { // first retrieve from vnode during the secondary stage sub-query // set the command flag must be after the semaphore been correctly set. @@ -1459,8 +1532,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter return TSDB_CODE_TSC_OUT_OF_MEMORY; } - pSql->pSubs[pSql->subState.numOfRemain++] = pNew; - assert(pSql->subState.numOfRemain <= pSql->subState.numOfSub); + pSql->pSubs[tableIndex] = pNew; if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) { addGroupInfoForSubquery(pSql, pNew, 0, tableIndex); @@ -1592,6 +1664,14 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { int32_t code = TSDB_CODE_SUCCESS; pSql->subState.numOfSub = pQueryInfo->numOfTables; + if (pSql->subState.states == NULL) { + pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(*pSql->subState.states)); + if (pSql->subState.states == NULL) { + code = TSDB_CODE_TSC_OUT_OF_MEMORY; + goto _error; + } + } + bool hasEmptySub = false; tscDebug("%p start subquery, total:%d", pSql, pQueryInfo->numOfTables); @@ -1627,7 +1707,7 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj* pSub = pSql->pSubs[i]; if ((code = tscProcessSql(pSub)) != TSDB_CODE_SUCCESS) { - pSql->subState.numOfRemain = i - 1; // the already sent request will continue and do not go to the error process routine + memset(pSql->subState.states + i, 1, sizeof(*pSql->subState.states) * (pSql->subState.numOfSub - i)); // the already sent request will continue and do not go to the error process routine break; } } @@ -1711,7 +1791,18 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { return ret; } - pState->numOfRemain = pState->numOfSub; + if (pState->states == NULL) { + pState->states = calloc(pState->numOfSub, sizeof(*pState->states)); + if (pState->states == NULL) { + pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; + tscAsyncResultOnError(pSql); + tfree(pMemoryBuf); + return ret; + } + } + + memset(pState->states, 0, sizeof(*pState->states) * pState->numOfSub); + pRes->code = TSDB_CODE_SUCCESS; int32_t i = 0; @@ -1860,7 +1951,6 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO assert(pSql != NULL); SSubqueryState* pState = &pParentSql->subState; - assert(pState->numOfRemain <= pState->numOfSub && pState->numOfRemain >= 0); // retrieved in subquery failed. OR query cancelled in retrieve phase. if (taos_errno(pSql) == TSDB_CODE_SUCCESS && pParentSql->res.code != TSDB_CODE_SUCCESS) { @@ -1891,14 +1981,12 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO } } - int32_t remain = -1; - if ((remain = atomic_sub_fetch_32(&pState->numOfRemain, 1)) > 0) { - tscDebug("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pParentSql, pSql, trsupport->subqueryIndex, - pState->numOfSub - remain); + if (!subAndCheckDone(pSql, pState, subqueryIndex)) { + tscDebug("%p sub:%p,%d freed, not finished, total:%d", pParentSql, pSql, trsupport->subqueryIndex, pState->numOfSub); tscFreeRetrieveSup(pSql); return; - } + } // all subqueries are failed tscError("%p retrieve from %d vnode(s) completed,code:%s.FAILED.", pParentSql, pState->numOfSub, @@ -1963,14 +2051,12 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p return; } - int32_t remain = -1; - if ((remain = atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1)) > 0) { - tscDebug("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pParentSql, pSql, trsupport->subqueryIndex, - pState->numOfSub - remain); + if (!subAndCheckDone(pSql, &pParentSql->subState, idx)) { + tscDebug("%p sub:%p orderOfSub:%d freed, not finished", pParentSql, pSql, trsupport->subqueryIndex); tscFreeRetrieveSup(pSql); return; - } + } // all sub-queries are returned, start to local merge process pDesc->pColumnModel->capacity = trsupport->pExtMemBuffer[idx]->numOfElemsPerPage; @@ -2016,7 +2102,6 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR SSqlObj * pParentSql = trsupport->pParentSql; SSubqueryState* pState = &pParentSql->subState; - assert(pState->numOfRemain <= pState->numOfSub && pState->numOfRemain >= 0); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); SVgroupInfo *pVgroup = &pTableMetaInfo->vgroupList->vgroups[0]; @@ -2237,7 +2322,8 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) } } - if (atomic_sub_fetch_32(&pParentObj->subState.numOfRemain, 1) > 0) { + if (!subAndCheckDone(tres, &pParentObj->subState, pSupporter->index)) { + tscDebug("%p insert:%p,%d completed, total:%d", pParentObj, tres, pSupporter->index, pParentObj->subState.numOfSub); return; } @@ -2271,6 +2357,8 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pParentObj->cmd, pSql->cmd.clauseIndex, 0); tscAddTableMetaInfo(pQueryInfo, pMasterTableMetaInfo->name, NULL, NULL, NULL, NULL); + subquerySetState(pSql, &pParentObj->subState, i, 0); + tscDebug("%p, failed sub:%d, %p", pParentObj, i, pSql); } } @@ -2285,7 +2373,6 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) } pParentObj->cmd.parseFinished = false; - pParentObj->subState.numOfRemain = numOfFailed; tscResetSqlCmdObj(&pParentObj->cmd); @@ -2361,7 +2448,16 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { // the number of already initialized subqueries int32_t numOfSub = 0; - pSql->subState.numOfRemain = pSql->subState.numOfSub; + if (pSql->subState.states == NULL) { + pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(*pSql->subState.states)); + if (pSql->subState.states == NULL) { + pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; + goto _error; + } + } + + memset(pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub); + pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES); if (pSql->pSubs == NULL) { goto _error; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index b44ebb3c98c7bd63a3814aede2b1bf3c2ccb4d95..e7330129814fa5f5c7e2cd0ccf9ce7e53288d2e1 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -441,6 +441,8 @@ static void tscFreeSubobj(SSqlObj* pSql) { pSql->pSubs[i] = NULL; } + tfree(pSql->subState.states); + pSql->subState.numOfSub = 0; } diff --git a/tests/script/general/http/restful_full.sim b/tests/script/general/http/restful_full.sim index 7e12d30ac91eb49025fbebed05336e15cc6900ac..645ebd278872f8c9d19d8353d8aa857ab0ae73cb 100644 --- a/tests/script/general/http/restful_full.sim +++ b/tests/script/general/http/restful_full.sim @@ -15,6 +15,7 @@ print =============== step1 - login system_content curl 127.0.0.1:7111/rest/ print 1-> $system_content if $system_content != @{"status":"error","code":4357,"desc":"no auth info input"}@ then + print $system_content return -1 endi diff --git a/tests/script/general/parser/join.sim b/tests/script/general/parser/join.sim index d18a3d76760bb61f7ddd5d37f37b073ca40ffba6..56f115051cd4e558839d958a5177164b9fcf7a91 100644 --- a/tests/script/general/parser/join.sim +++ b/tests/script/general/parser/join.sim @@ -415,6 +415,7 @@ sql select count(join_mt0.c1), sum(join_mt1.c2), first(join_mt0.c5), last(join_m $val = 100 if $rows != $val then + print $rows return -1 endi @@ -514,4 +515,4 @@ sql drop table tm2; sql select count(*) from m1, m2 where m1.ts=m2.ts and m1.b=m2.a; sql drop database ux1; -system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file +system sh/exec.sh -n dnode1 -s stop -x SIGINT