diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 5316e64035dff10a794138d04fa435b8a877ea32..7aa3913817c87720c94f4bb9016b688b416346f7 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -68,25 +68,28 @@ static void subquerySetState(SSqlObj *pSql, SSubqueryState *subState, int idx, i pthread_mutex_unlock(&subState->mutex); } -static bool allSubqueryDone(SSubqueryState *subState) { +static bool allSubqueryDone(SSqlObj *pParentSql) { bool done = true; + SSubqueryState *subState = &pParentSql->subState; //lock in caller for (int i = 0; i < subState->numOfSub; i++) { if (0 == subState->states[i]) { - tscDebug("subquery:%d is NOT finished, total:%d", i, subState->numOfSub); + tscDebug("subquery:%p,%d is NOT finished, total:%d", pParentSql->pSubs[i], i, subState->numOfSub); done = false; break; } else { - tscDebug("subquery:%d is finished, total:%d", i, subState->numOfSub); + tscDebug("subquery:%p,%d is finished, total:%d", pParentSql->pSubs[i], i, subState->numOfSub); } } return done; } -static bool subAndCheckDone(SSqlObj *pSql, SSubqueryState *subState, int idx) { +static bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx) { + SSubqueryState *subState = &pParentSql->subState; + assert(idx < subState->numOfSub); pthread_mutex_lock(&subState->mutex); @@ -95,7 +98,7 @@ static bool subAndCheckDone(SSqlObj *pSql, SSubqueryState *subState, int idx) { subState->states[idx] = 1; - bool done = allSubqueryDone(subState); + bool done = allSubqueryDone(pParentSql); pthread_mutex_unlock(&subState->mutex); @@ -580,7 +583,7 @@ void freeJoinSubqueryObj(SSqlObj* pSql) { } static void quitAllSubquery(SSqlObj* pSqlSub, SSqlObj* pSqlObj, SJoinSupporter* pSupporter) { - if (subAndCheckDone(pSqlSub, &pSqlObj->subState, pSupporter->subqueryIndex)) { + if (subAndCheckDone(pSqlSub, pSqlObj, pSupporter->subqueryIndex)) { tscError("%p all subquery return and query failed, global code:%s", pSqlObj, tstrerror(pSqlObj->res.code)); freeJoinSubqueryObj(pSqlObj); return; @@ -897,7 +900,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow // no data exists in next vnode, mark the query completed // only when there is no subquery exits any more, proceeds to get the intersect of the tuple sets. - if (!subAndCheckDone(pSql, &pParentSql->subState, pSupporter->subqueryIndex)) { + if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) { tscDebug("%p tagRetrieve:%p,%d completed, total:%d", pParentSql, tres, pSupporter->subqueryIndex, pParentSql->subState.numOfSub); return; } @@ -945,8 +948,10 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow ((SJoinSupporter*)psub2->param)->pVgroupTables = tscVgroupTableInfoClone(pTableMetaInfo2->pVgroupTables); pParentSql->subState.numOfSub = 2; + memset(pParentSql->subState.states, 0, sizeof(pParentSql->subState.states[0]) * pParentSql->subState.numOfSub); - + tscDebug("%p reset all sub states to 0", pParentSql); + for (int32_t m = 0; m < pParentSql->subState.numOfSub; ++m) { SSqlObj* sub = pParentSql->pSubs[m]; issueTSCompQuery(sub, sub->param, pParentSql); @@ -1063,7 +1068,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow return; } - if (!subAndCheckDone(pSql, &pParentSql->subState, pSupporter->subqueryIndex)) { + if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) { return; } @@ -1142,7 +1147,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR } } - if (!subAndCheckDone(pSql, pState, pSupporter->subqueryIndex)) { + if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) { tscDebug("%p sub:%p,%d completed, total:%d", pParentSql, tres, pSupporter->subqueryIndex, pState->numOfSub); return; } @@ -1263,7 +1268,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj* pSub = pSql->pSubs[i]; if (pSub != NULL && pSub->res.row >= pSub->res.numOfRows && pSub->res.completed) { - pSql->subState.states[i] = 0; + subquerySetState(pSub, &pSql->subState, i, 0); } } } @@ -1476,7 +1481,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { // In case of consequence query from other vnode, do not wait for other query response here. if (!(pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0))) { - if (!subAndCheckDone(pSql, &pParentSql->subState, pSupporter->subqueryIndex)) { + if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) { return; } } @@ -1830,6 +1835,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { } memset(pState->states, 0, sizeof(*pState->states) * pState->numOfSub); + tscDebug("%p reset all sub states to 0", pSql); pRes->code = TSDB_CODE_SUCCESS; @@ -2009,7 +2015,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO } } - if (!subAndCheckDone(pSql, pState, subqueryIndex)) { + if (!subAndCheckDone(pSql, pParentSql, subqueryIndex)) { tscDebug("%p sub:%p,%d freed, not finished, total:%d", pParentSql, pSql, trsupport->subqueryIndex, pState->numOfSub); tscFreeRetrieveSup(pSql); @@ -2079,7 +2085,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p return; } - if (!subAndCheckDone(pSql, &pParentSql->subState, idx)) { + if (!subAndCheckDone(pSql, pParentSql, idx)) { tscDebug("%p sub:%p orderOfSub:%d freed, not finished", pParentSql, pSql, trsupport->subqueryIndex); tscFreeRetrieveSup(pSql); @@ -2350,7 +2356,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) } } - if (!subAndCheckDone(tres, &pParentObj->subState, pSupporter->index)) { + if (!subAndCheckDone(tres, pParentObj, pSupporter->index)) { tscDebug("%p insert:%p,%d completed, total:%d", pParentObj, tres, pSupporter->index, pParentObj->subState.numOfSub); return; } @@ -2487,6 +2493,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { } memset(pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub); + tscDebug("%p reset all sub states to 0", pSql); pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES); if (pSql->pSubs == NULL) {