diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 27e9b2ced0070414e291264e343a232f0c40e025..30d128f006dc1eaf0c87b309c9b45fc28e8369ac 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -317,7 +317,7 @@ typedef struct STscObj { } STscObj; typedef struct SSubqueryState { - int32_t subLock; + pthread_mutex_t mutex; int8_t *states; int32_t numOfSub; // the number of total sub-queries uint64_t numOfRetrievedRows; // total number of points in this query diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index e39bd05c56543b1e789b52b349063015cf938cc2..51855f1575dc29f2fcb792ca74a62331e9a1bd3c 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -55,34 +55,17 @@ static void skipRemainValue(STSBuf* pTSBuf, tVariant* tag1) { } } - -void tscSpinLock(int32_t *lock) { - int i = 0; - while (atomic_val_compare_exchange_32(lock, 0, 1) != 0) { - if (++i % 100 == 0) { - sched_yield(); - } - } -} - -void tscSpinUnlock(int32_t *lock) { - if (atomic_val_compare_exchange_32(lock, 1, 0) != 1) { - assert(false); - } -} - - static void subquerySetState(SSqlObj *pSql, SSubqueryState *subState, int idx, int8_t state) { assert(idx < subState->numOfSub); assert(subState->states); - tscSpinLock(&subState->subLock); - + pthread_mutex_lock(&subState->mutex); + tscDebug("subquery:%p,%d state set to %d", pSql, idx, state); subState->states[idx] = state; - tscSpinUnlock(&subState->subLock); + pthread_mutex_unlock(&subState->mutex); } static bool allSubqueryDone(SSubqueryState *subState) { @@ -106,7 +89,7 @@ static bool allSubqueryDone(SSubqueryState *subState) { static bool subAndCheckDone(SSqlObj *pSql, SSubqueryState *subState, int idx) { assert(idx < subState->numOfSub); - tscSpinLock(&subState->subLock); + pthread_mutex_lock(&subState->mutex); tscDebug("subquery:%p,%d state set to 1", pSql, idx); @@ -114,7 +97,7 @@ static bool subAndCheckDone(SSqlObj *pSql, SSubqueryState *subState, int idx) { bool done = allSubqueryDone(subState); - tscSpinUnlock(&subState->subLock); + pthread_mutex_unlock(&subState->mutex); return done; } @@ -432,6 +415,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { // scan all subquery, if one sub query has only ts, ignore it tscDebug("%p start to launch secondary subqueries, %d out of %d needs to query", pSql, numOfSub, pSql->subState.numOfSub); + memset(&pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub); bool success = true; @@ -466,7 +450,6 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { break; } - subquerySetState(pNew, &pSql->subState, i, 0); tscClearSubqueryInfo(&pNew->cmd); pSql->pSubs[i] = pNew; @@ -585,8 +568,13 @@ void freeJoinSubqueryObj(SSqlObj* pSql) { pSql->pSubs[i] = NULL; } + if (pSql->subState.states) { + pthread_mutex_destroy(&pSql->subState.mutex); + } + tfree(pSql->subState.states); + pSql->subState.numOfSub = 0; } @@ -1269,6 +1257,19 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { } } + + if (orderedPrjQuery) { + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { + SSqlObj* pSub = pSql->pSubs[i]; + if (pSub != NULL && pSub->res.row >= pSub->res.numOfRows && pSub->res.completed) { + pSql->subState.states[i] = 0; + } else { + pSql->subState.states[i] = 1; + } + } + } + + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj* pSub = pSql->pSubs[i]; if (pSub == NULL) { @@ -1296,13 +1297,9 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { pSub->cmd.command = TSDB_SQL_SELECT; pSub->fp = tscJoinQueryCallback; - subquerySetState(pSub, &pSql->subState, i, 0); - tscProcessSql(pSub); tryNextVnode = true; } else { - subquerySetState(pSub, &pSql->subState, i, 1); - tscDebug("%p no result in current subquery anymore", pSub); } } @@ -1494,8 +1491,6 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { if (pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { pSql->fp = joinRetrieveFinalResCallback; // continue retrieve data pSql->cmd.command = TSDB_SQL_FETCH; - - subquerySetState(pSql, &pParentSql->subState, pSupporter->subqueryIndex, 0); tscProcessSql(pSql); } else { // first retrieve from vnode during the secondary stage sub-query @@ -1670,6 +1665,8 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { code = TSDB_CODE_TSC_OUT_OF_MEMORY; goto _error; } + + pthread_mutex_init(&pSql->subState.mutex, NULL); } bool hasEmptySub = false; @@ -1707,8 +1704,9 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj* pSub = pSql->pSubs[i]; if ((code = tscProcessSql(pSub)) != TSDB_CODE_SUCCESS) { - memset(pSql->subState.states + i, 1, sizeof(*pSql->subState.states) * (pSql->subState.numOfSub - i)); // the already sent request will continue and do not go to the error process routine - break; + pRes->code = code; + (*pSub->fp)(pSub->param, pSub, 0); + return; } } @@ -1799,6 +1797,8 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { tfree(pMemoryBuf); return ret; } + + pthread_mutex_init(&pState->mutex, NULL); } memset(pState->states, 0, sizeof(*pState->states) * pState->numOfSub); @@ -2454,6 +2454,8 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; goto _error; } + + pthread_mutex_init(&pSql->subState.mutex, NULL); } memset(pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index e7330129814fa5f5c7e2cd0ccf9ce7e53288d2e1..7b6c91d265477aeaeb5b93a20c4f57d43b63eea2 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -441,8 +441,12 @@ static void tscFreeSubobj(SSqlObj* pSql) { pSql->pSubs[i] = NULL; } + if (pSql->subState.states) { + pthread_mutex_destroy(&pSql->subState.mutex); + } + tfree(pSql->subState.states); - + pSql->subState.numOfSub = 0; }