diff --git a/src/client/inc/tscSubquery.h b/src/client/inc/tscSubquery.h index 7529891635f47fb4f2b68ede5b48ed5640c8f00a..e1370513ef28b4eb86d4caf8c7b34481f334c512 100644 --- a/src/client/inc/tscSubquery.h +++ b/src/client/inc/tscSubquery.h @@ -47,7 +47,6 @@ void tscLockByThread(int64_t *lockedBy); void tscUnlockByThread(int64_t *lockedBy); - #ifdef __cplusplus } #endif diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 652e5bdd47b5bfa3adfdb8b8f58e94399c4de810..085791e45c0920174d6091328c4c5ae4f570eac0 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -317,7 +317,8 @@ typedef struct STscObj { } STscObj; typedef struct SSubqueryState { - int32_t numOfRemain; // the number of remain unfinished subquery + pthread_mutex_t mutex; + int8_t *states; int32_t numOfSub; // the number of total sub-queries uint64_t numOfRetrievedRows; // total number of points in this query } SSubqueryState; diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index 6280cc520a9adf7b2d8686455b1524a8b356af83..c9c877c04319d43af7acad0ebe921a62800a20f0 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -1427,6 +1427,10 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { tscResetForNextRetrieve(pRes); if (pSql->signature != pSql || pRes == NULL || pRes->pLocalReducer == NULL) { // all data has been processed + if (pRes->code == TSDB_CODE_SUCCESS) { + return pRes->code; + } + tscError("%p local merge abort due to error occurs, code:%s", pSql, tstrerror(pRes->code)); return pRes->code; } diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 2622246111894caf39f3c1c37923d1a7207935b6..ee9526e297f0b748df52e5e7b5cd552a2f39df34 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -55,6 +55,58 @@ static void skipRemainValue(STSBuf* pTSBuf, tVariant* tag1) { } } +static void subquerySetState(SSqlObj *pSql, SSubqueryState *subState, int idx, int8_t state) { + assert(idx < subState->numOfSub); + assert(subState->states); + + pthread_mutex_lock(&subState->mutex); + + tscDebug("subquery:%p,%d state set to %d", pSql, idx, state); + + subState->states[idx] = state; + + pthread_mutex_unlock(&subState->mutex); +} + +static bool allSubqueryDone(SSqlObj *pParentSql) { + bool done = true; + SSubqueryState *subState = &pParentSql->subState; + + //lock in caller + + for (int i = 0; i < subState->numOfSub; i++) { + if (0 == subState->states[i]) { + tscDebug("%p subquery:%p,%d is NOT finished, total:%d", pParentSql, pParentSql->pSubs[i], i, subState->numOfSub); + done = false; + break; + } else { + tscDebug("%p subquery:%p,%d is finished, total:%d", pParentSql, pParentSql->pSubs[i], i, subState->numOfSub); + } + } + + return done; +} + +static bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx) { + SSubqueryState *subState = &pParentSql->subState; + + assert(idx < subState->numOfSub); + + pthread_mutex_lock(&subState->mutex); + + tscDebug("%p subquery:%p,%d state set to 1", pParentSql, pSql, idx); + + subState->states[idx] = 1; + + bool done = allSubqueryDone(pParentSql); + + pthread_mutex_unlock(&subState->mutex); + + return done; +} + + + static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJoinSupporter* pSupporter2, STimeWindow * win) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); @@ -367,10 +419,6 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { // scan all subquery, if one sub query has only ts, ignore it tscDebug("%p start to launch secondary subqueries, %d out of %d needs to query", pSql, numOfSub, pSql->subState.numOfSub); - //the subqueries that do not actually launch the secondary query to virtual node is set as completed. - SSubqueryState* pState = &pSql->subState; - pState->numOfRemain = numOfSub; - bool success = true; for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { @@ -403,6 +451,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { success = false; break; } + tscClearSubqueryInfo(&pNew->cmd); pSql->pSubs[i] = pNew; @@ -480,6 +529,8 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { } } + subquerySetState(pPrevSub, &pSql->subState, i, 0); + size_t numOfCols = taosArrayGetSize(pQueryInfo->colList); tscDebug("%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, name:%s", pSql, pNew, 0, pTableMetaInfo->vgroupIndex, pQueryInfo->type, taosArrayGetSize(pQueryInfo->exprList), @@ -517,20 +568,25 @@ void freeJoinSubqueryObj(SSqlObj* pSql) { SJoinSupporter* p = pSub->param; tscDestroyJoinSupporter(p); - if (pSub->res.code == TSDB_CODE_SUCCESS) { - taos_free_result(pSub); - } + taos_free_result(pSub); + pSql->pSubs[i] = NULL; } + if (pSql->subState.states) { + pthread_mutex_destroy(&pSql->subState.mutex); + } + + tfree(pSql->subState.states); + + pSql->subState.numOfSub = 0; } -static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSupporter* pSupporter) { - assert(pSqlObj->subState.numOfRemain > 0); - - if (atomic_sub_fetch_32(&pSqlObj->subState.numOfRemain, 1) <= 0) { - tscError("%p all subquery return and query failed, global code:%s", pSqlObj, tstrerror(pSqlObj->res.code)); +static void quitAllSubquery(SSqlObj* pSqlSub, SSqlObj* pSqlObj, SJoinSupporter* pSupporter) { + if (subAndCheckDone(pSqlSub, pSqlObj, pSupporter->subqueryIndex)) { + tscError("%p all subquery return and query failed, global code:%s", pSqlObj, tstrerror(pSqlObj->res.code)); freeJoinSubqueryObj(pSqlObj); + return; } //tscDestroyJoinSupporter(pSupporter); @@ -777,6 +833,15 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); assert(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY)); + if (pParentSql->res.code != TSDB_CODE_SUCCESS) { + tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, numOfRows, pParentSql->res.code); + quitAllSubquery(pSql, pParentSql, pSupporter); + + tscAsyncResultOnError(pParentSql); + + return; + } + // check for the error code firstly if (taos_errno(pSql) != TSDB_CODE_SUCCESS) { // todo retry if other subqueries are not failed @@ -785,7 +850,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow tscError("%p sub query failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex); pParentSql->res.code = numOfRows; - quitAllSubquery(pParentSql, pSupporter); + quitAllSubquery(pSql, pParentSql, pSupporter); tscAsyncResultOnError(pParentSql); return; @@ -802,7 +867,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow tscError("%p failed to malloc memory", pSql); pParentSql->res.code = TAOS_SYSTEM_ERROR(errno); - quitAllSubquery(pParentSql, pSupporter); + quitAllSubquery(pSql, pParentSql, pSupporter); tscAsyncResultOnError(pParentSql); return; @@ -844,9 +909,10 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow // no data exists in next vnode, mark the query completed // only when there is no subquery exits any more, proceeds to get the intersect of the tuple sets. - if (atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1) > 0) { + if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) { + tscDebug("%p tagRetrieve:%p,%d completed, total:%d", pParentSql, tres, pSupporter->subqueryIndex, pParentSql->subState.numOfSub); return; - } + } SArray *s1 = NULL, *s2 = NULL; int32_t code = getIntersectionOfTableTuple(pQueryInfo, pParentSql, &s1, &s2); @@ -891,8 +957,10 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow ((SJoinSupporter*)psub2->param)->pVgroupTables = tscVgroupTableInfoClone(pTableMetaInfo2->pVgroupTables); pParentSql->subState.numOfSub = 2; - pParentSql->subState.numOfRemain = pParentSql->subState.numOfSub; - + + memset(pParentSql->subState.states, 0, sizeof(pParentSql->subState.states[0]) * pParentSql->subState.numOfSub); + tscDebug("%p reset all sub states to 0", pParentSql); + for (int32_t m = 0; m < pParentSql->subState.numOfSub; ++m) { SSqlObj* sub = pParentSql->pSubs[m]; issueTSCompQuery(sub, sub->param, pParentSql); @@ -915,6 +983,15 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); assert(!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)); + if (pParentSql->res.code != TSDB_CODE_SUCCESS) { + tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, numOfRows, pParentSql->res.code); + quitAllSubquery(pSql, pParentSql, pSupporter); + + tscAsyncResultOnError(pParentSql); + + return; + } + // check for the error code firstly if (taos_errno(pSql) != TSDB_CODE_SUCCESS) { // todo retry if other subqueries are not failed yet @@ -922,7 +999,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow tscError("%p sub query failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex); pParentSql->res.code = numOfRows; - quitAllSubquery(pParentSql, pSupporter); + quitAllSubquery(pSql, pParentSql, pSupporter); tscAsyncResultOnError(pParentSql); return; @@ -937,7 +1014,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow pParentSql->res.code = TAOS_SYSTEM_ERROR(errno); - quitAllSubquery(pParentSql, pSupporter); + quitAllSubquery(pSql, pParentSql, pSupporter); tscAsyncResultOnError(pParentSql); @@ -955,7 +1032,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow pParentSql->res.code = TAOS_SYSTEM_ERROR(errno); - quitAllSubquery(pParentSql, pSupporter); + quitAllSubquery(pSql, pParentSql, pSupporter); tscAsyncResultOnError(pParentSql); @@ -1009,9 +1086,9 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow return; } - if (atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1) > 0) { + if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) { return; - } + } tscDebug("%p all subquery retrieve ts complete, do ts block intersect", pParentSql); @@ -1049,6 +1126,17 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR SSqlRes* pRes = &pSql->res; SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + + if (pParentSql->res.code != TSDB_CODE_SUCCESS) { + tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, numOfRows, pParentSql->res.code); + quitAllSubquery(pSql, pParentSql, pSupporter); + + tscAsyncResultOnError(pParentSql); + + return; + } + + if (taos_errno(pSql) != TSDB_CODE_SUCCESS) { assert(numOfRows == taos_errno(pSql)); @@ -1088,9 +1176,8 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR } } - assert(pState->numOfRemain > 0); - if (atomic_sub_fetch_32(&pState->numOfRemain, 1) > 0) { - tscDebug("%p sub:%p completed, remain:%d, total:%d", pParentSql, tres, pState->numOfRemain, pState->numOfSub); + if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) { + tscDebug("%p sub:%p,%d completed, total:%d", pParentSql, tres, pSupporter->subqueryIndex, pState->numOfSub); return; } @@ -1205,15 +1292,16 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { } } - // get the number of subquery that need to retrieve the next vnode. + if (orderedPrjQuery) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj* pSub = pSql->pSubs[i]; if (pSub != NULL && pSub->res.row >= pSub->res.numOfRows && pSub->res.completed) { - pSql->subState.numOfRemain++; + subquerySetState(pSub, &pSql->subState, i, 0); } } } + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj* pSub = pSql->pSubs[i]; @@ -1270,7 +1358,19 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { // retrieve data from current vnode. tscDebug("%p retrieve data from %d subqueries", pSql, numOfFetch); SJoinSupporter* pSupporter = NULL; - pSql->subState.numOfRemain = numOfFetch; + + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { + SSqlObj* pSql1 = pSql->pSubs[i]; + if (pSql1 == NULL) { + continue; + } + + SSqlRes* pRes1 = &pSql1->res; + + if (pRes1->row >= pRes1->numOfRows) { + subquerySetState(pSql1, &pSql->subState, i, 0); + } + } for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj* pSql1 = pSql->pSubs[i]; @@ -1372,7 +1472,8 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { // retrieve actual query results from vnode during the second stage join subquery if (pParentSql->res.code != TSDB_CODE_SUCCESS) { tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, code, pParentSql->res.code); - quitAllSubquery(pParentSql, pSupporter); + quitAllSubquery(pSql, pParentSql, pSupporter); + tscAsyncResultOnError(pParentSql); return; @@ -1384,7 +1485,8 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { tscError("%p abort query, code:%s, global code:%s", pSql, tstrerror(code), tstrerror(pParentSql->res.code)); pParentSql->res.code = code; - quitAllSubquery(pParentSql, pSupporter); + + quitAllSubquery(pSql, pParentSql, pSupporter); tscAsyncResultOnError(pParentSql); return; @@ -1408,9 +1510,9 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { // In case of consequence query from other vnode, do not wait for other query response here. if (!(pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0))) { - if (atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1) > 0) { + if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) { return; - } + } } tscSetupOutputColumnIndex(pParentSql); @@ -1422,6 +1524,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { if (pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { pSql->fp = joinRetrieveFinalResCallback; // continue retrieve data pSql->cmd.command = TSDB_SQL_FETCH; + tscProcessSql(pSql); } else { // first retrieve from vnode during the secondary stage sub-query // set the command flag must be after the semaphore been correctly set. @@ -1457,8 +1560,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter return TSDB_CODE_TSC_OUT_OF_MEMORY; } - pSql->pSubs[pSql->subState.numOfRemain++] = pNew; - assert(pSql->subState.numOfRemain <= pSql->subState.numOfSub); + pSql->pSubs[tableIndex] = pNew; if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) { addGroupInfoForSubquery(pSql, pNew, 0, tableIndex); @@ -1590,6 +1692,19 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { int32_t code = TSDB_CODE_SUCCESS; pSql->subState.numOfSub = pQueryInfo->numOfTables; + if (pSql->subState.states == NULL) { + pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(*pSql->subState.states)); + if (pSql->subState.states == NULL) { + code = TSDB_CODE_TSC_OUT_OF_MEMORY; + goto _error; + } + + pthread_mutex_init(&pSql->subState.mutex, NULL); + } + + memset(pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub); + tscDebug("%p reset all sub states to 0", pSql); + bool hasEmptySub = false; tscDebug("%p start subquery, total:%d", pSql, pQueryInfo->numOfTables); @@ -1622,14 +1737,25 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { pSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; (*pSql->fp)(pSql->param, pSql, 0); } else { + int fail = 0; for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj* pSub = pSql->pSubs[i]; + if (fail) { + (*pSub->fp)(pSub->param, pSub, 0); + continue; + } + if ((code = tscProcessSql(pSub)) != TSDB_CODE_SUCCESS) { - pSql->subState.numOfRemain = i - 1; // the already sent request will continue and do not go to the error process routine - break; + pRes->code = code; + (*pSub->fp)(pSub->param, pSub, 0); + fail = 1; } } + if(fail) { + return; + } + pSql->cmd.command = TSDB_SQL_TABLE_JOIN_RETRIEVE; } @@ -1728,7 +1854,21 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { return ret; } - pState->numOfRemain = pState->numOfSub; + if (pState->states == NULL) { + pState->states = calloc(pState->numOfSub, sizeof(*pState->states)); + if (pState->states == NULL) { + pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; + tscAsyncResultOnError(pSql); + tfree(pMemoryBuf); + return ret; + } + + pthread_mutex_init(&pState->mutex, NULL); + } + + memset(pState->states, 0, sizeof(*pState->states) * pState->numOfSub); + tscDebug("%p reset all sub states to 0", pSql); + pRes->code = TSDB_CODE_SUCCESS; int32_t i = 0; @@ -1877,7 +2017,6 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO assert(pSql != NULL); SSubqueryState* pState = &pParentSql->subState; - assert(pState->numOfRemain <= pState->numOfSub && pState->numOfRemain >= 0); // retrieved in subquery failed. OR query cancelled in retrieve phase. if (taos_errno(pSql) == TSDB_CODE_SUCCESS && pParentSql->res.code != TSDB_CODE_SUCCESS) { @@ -1908,14 +2047,12 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO } } - int32_t remain = -1; - if ((remain = atomic_sub_fetch_32(&pState->numOfRemain, 1)) > 0) { - tscDebug("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pParentSql, pSql, trsupport->subqueryIndex, - pState->numOfSub - remain); + if (!subAndCheckDone(pSql, pParentSql, subqueryIndex)) { + tscDebug("%p sub:%p,%d freed, not finished, total:%d", pParentSql, pSql, trsupport->subqueryIndex, pState->numOfSub); tscFreeRetrieveSup(pSql); return; - } + } // all subqueries are failed tscError("%p retrieve from %d vnode(s) completed,code:%s.FAILED.", pParentSql, pState->numOfSub, @@ -1980,14 +2117,12 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p return; } - int32_t remain = -1; - if ((remain = atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1)) > 0) { - tscDebug("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pParentSql, pSql, trsupport->subqueryIndex, - pState->numOfSub - remain); + if (!subAndCheckDone(pSql, pParentSql, idx)) { + tscDebug("%p sub:%p orderOfSub:%d freed, not finished", pParentSql, pSql, trsupport->subqueryIndex); tscFreeRetrieveSup(pSql); return; - } + } // all sub-queries are returned, start to local merge process pDesc->pColumnModel->capacity = trsupport->pExtMemBuffer[idx]->numOfElemsPerPage; @@ -2033,7 +2168,6 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR SSqlObj * pParentSql = trsupport->pParentSql; SSubqueryState* pState = &pParentSql->subState; - assert(pState->numOfRemain <= pState->numOfSub && pState->numOfRemain >= 0); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); SVgroupInfo *pVgroup = &pTableMetaInfo->vgroupList->vgroups[0]; @@ -2254,7 +2388,8 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) } } - if (atomic_sub_fetch_32(&pParentObj->subState.numOfRemain, 1) > 0) { + if (!subAndCheckDone(tres, pParentObj, pSupporter->index)) { + tscDebug("%p insert:%p,%d completed, total:%d", pParentObj, tres, pSupporter->index, pParentObj->subState.numOfSub); return; } @@ -2288,6 +2423,8 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pParentObj->cmd, pSql->cmd.clauseIndex, 0); tscAddTableMetaInfo(pQueryInfo, pMasterTableMetaInfo->name, NULL, NULL, NULL, NULL); + subquerySetState(pSql, &pParentObj->subState, i, 0); + tscDebug("%p, failed sub:%d, %p", pParentObj, i, pSql); } } @@ -2302,7 +2439,6 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) } pParentObj->cmd.parseFinished = false; - pParentObj->subState.numOfRemain = numOfFailed; tscResetSqlCmdObj(&pParentObj->cmd); @@ -2378,7 +2514,19 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { // the number of already initialized subqueries int32_t numOfSub = 0; - pSql->subState.numOfRemain = pSql->subState.numOfSub; + if (pSql->subState.states == NULL) { + pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(*pSql->subState.states)); + if (pSql->subState.states == NULL) { + pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; + goto _error; + } + + pthread_mutex_init(&pSql->subState.mutex, NULL); + } + + memset(pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub); + tscDebug("%p reset all sub states to 0", pSql); + pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES); if (pSql->pSubs == NULL) { goto _error; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 02cd9b9692ffa3e72270b57a4f1dd2dc146f0257..438836468bcb44b8c7d90a055d7b0a47cd2622e5 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -441,6 +441,12 @@ static void tscFreeSubobj(SSqlObj* pSql) { pSql->pSubs[i] = NULL; } + if (pSql->subState.states) { + pthread_mutex_destroy(&pSql->subState.mutex); + } + + tfree(pSql->subState.states); + pSql->subState.numOfSub = 0; } diff --git a/tests/script/general/http/restful_full.sim b/tests/script/general/http/restful_full.sim index 7e12d30ac91eb49025fbebed05336e15cc6900ac..645ebd278872f8c9d19d8353d8aa857ab0ae73cb 100644 --- a/tests/script/general/http/restful_full.sim +++ b/tests/script/general/http/restful_full.sim @@ -15,6 +15,7 @@ print =============== step1 - login system_content curl 127.0.0.1:7111/rest/ print 1-> $system_content if $system_content != @{"status":"error","code":4357,"desc":"no auth info input"}@ then + print $system_content return -1 endi diff --git a/tests/script/general/parser/join.sim b/tests/script/general/parser/join.sim index d18a3d76760bb61f7ddd5d37f37b073ca40ffba6..56f115051cd4e558839d958a5177164b9fcf7a91 100644 --- a/tests/script/general/parser/join.sim +++ b/tests/script/general/parser/join.sim @@ -415,6 +415,7 @@ sql select count(join_mt0.c1), sum(join_mt1.c2), first(join_mt0.c5), last(join_m $val = 100 if $rows != $val then + print $rows return -1 endi @@ -514,4 +515,4 @@ sql drop table tm2; sql select count(*) from m1, m2 where m1.ts=m2.ts and m1.b=m2.a; sql drop database ux1; -system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file +system sh/exec.sh -n dnode1 -s stop -x SIGINT