From dcc175edacd5125c6e1831fed5c56eca3f901a1b Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Sat, 4 Jan 2020 17:44:25 +0800 Subject: [PATCH] prepare all subqueries object before launch subqueries. and add some check for global status object during super table query. --- src/client/src/tscServer.c | 195 +++++++++++++++++++++---------------- 1 file changed, 112 insertions(+), 83 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 0ba4f1349d..bdac9185eb 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -870,28 +870,26 @@ int tscProcessSql(SSqlObj *pSql) { return doProcessSql(pSql); } -static void doCleanupSubqueries(SSqlObj *pSql, int32_t vnodeIndex, int32_t numOfVnodes, SRetrieveSupport *pTrs, - tOrderDescriptor *pDesc, tColModel *pModel, tExtMemBuffer **pMemoryBuf, - SSubqueryState *pState) { - pSql->cmd.command = TSDB_SQL_RETRIEVE_METRIC; - pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY; - - /* - * if i > 0, at least one sub query is issued, the allocated resource is - * freed by it when subquery completed. - */ - if (vnodeIndex == 0) { - tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, numOfVnodes); - tfree(pState); - - if (pTrs != NULL) { - tfree(pTrs->localBuffer); - - pthread_mutex_unlock(&pTrs->queryMutex); - pthread_mutex_destroy(&pTrs->queryMutex); - tfree(pTrs); - } +static void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs, SSubqueryState* pState) { + assert(numOfSubs <= pSql->numOfSubs && numOfSubs >= 0 && pState != NULL); + + for(int32_t i = 0; i < numOfSubs; ++i) { + SSqlObj* pSub = pSql->pSubs[i]; + assert(pSub != NULL); + + SRetrieveSupport* pSupport = pSub->param; + + tfree(pSupport->localBuffer); + + pthread_mutex_unlock(&pSupport->queryMutex); + pthread_mutex_destroy(&pSupport->queryMutex); + + tfree(pSupport); + + tscFreeSqlObj(pSub); } + + free(pState); } int tscLaunchMetricSubQueries(SSqlObj *pSql) { @@ -911,8 +909,9 @@ int tscLaunchMetricSubQueries(SSqlObj *pSql) { const uint32_t nBufferSize = (1 << 16); // 64KB SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0); - int32_t numOfVnodes = pMeterMetaInfo->pMetricMeta->numOfVnodes; - assert(numOfVnodes > 0); + + int32_t numOfSubQueries = pMeterMetaInfo->pMetricMeta->numOfVnodes; + assert(numOfSubQueries > 0); int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, nBufferSize); if (ret != 0) { @@ -923,36 +922,33 @@ int tscLaunchMetricSubQueries(SSqlObj *pSql) { return pRes->code; } - pSql->pSubs = malloc(POINTER_BYTES * numOfVnodes); - pSql->numOfSubs = numOfVnodes; + pSql->pSubs = calloc(numOfSubQueries, POINTER_BYTES); + pSql->numOfSubs = numOfSubQueries; - tscTrace("%p retrieved query data from %d vnode(s)", pSql, numOfVnodes); + tscTrace("%p retrieved query data from %d vnode(s)", pSql, numOfSubQueries); SSubqueryState *pState = calloc(1, sizeof(SSubqueryState)); - pState->numOfTotal = numOfVnodes; + pState->numOfTotal = numOfSubQueries; pRes->code = TSDB_CODE_SUCCESS; - for (int32_t i = 0; i < numOfVnodes; ++i) { - if (pRes->code == TSDB_CODE_QUERY_CANCELLED || pRes->code == TSDB_CODE_CLI_OUT_OF_MEMORY) { - /* - * during launch sub queries, if the master query is cancelled. the remain is ignored and set the retrieveDoneRec - * to the value of remaining not built sub-queries. So, the already issued sub queries can successfully free - * allocated resources. - */ - pState->numOfCompleted = (numOfVnodes - i); - doCleanupSubqueries(pSql, i, numOfVnodes, NULL, pDesc, pModel, pMemoryBuf, pState); - - if (i == 0) { - return pSql->res.code; - } - + int32_t i = 0; + for (; i < numOfSubQueries; ++i) { + SRetrieveSupport *trs = (SRetrieveSupport *)calloc(1, sizeof(SRetrieveSupport)); + if (trs == NULL) { + tscError("%p failed to malloc buffer for SRetrieveSupport, orderOfSub:%d, reason:%s", pSql, i, strerror(errno)); break; } - - SRetrieveSupport *trs = (SRetrieveSupport *)calloc(1, sizeof(SRetrieveSupport)); + trs->pExtMemBuffer = pMemoryBuf; trs->pOrderDescriptor = pDesc; trs->pState = pState; + trs->localBuffer = (tFilePage *)calloc(1, nBufferSize + sizeof(tFilePage)); + if (trs->localBuffer == NULL) { + tscError("%p failed to malloc buffer for local buffer, orderOfSub:%d, reason:%s", pSql, i, strerror(errno)); + tfree(trs); + break; + } + trs->subqueryIndex = i; trs->pParentSqlObj = pSql; trs->pFinalColModel = pModel; @@ -963,15 +959,10 @@ int tscLaunchMetricSubQueries(SSqlObj *pSql) { pthread_mutexattr_destroy(&mutexattr); SSqlObj *pNew = tscCreateSqlObjForSubquery(pSql, trs, NULL); - if (pNew == NULL) { - pState->numOfCompleted = (numOfVnodes - i); - doCleanupSubqueries(pSql, i, numOfVnodes, trs, pDesc, pModel, pMemoryBuf, pState); - - if (i == 0) { - return pSql->res.code; - } - + tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, i, strerror(errno)); + tfree(trs->localBuffer); + tfree(trs); break; } @@ -979,9 +970,31 @@ int tscLaunchMetricSubQueries(SSqlObj *pSql) { if (pSql->cmd.tsBuf) { pNew->cmd.tsBuf = tsBufClone(pSql->cmd.tsBuf); } - - tscTrace("%p sub:%p launch subquery.orderOfSub:%d", pSql, pNew, trs->subqueryIndex); - tscProcessSql(pNew); + + tscTrace("%p sub:%p create subquery success. orderOfSub:%d", pSql, pNew, trs->subqueryIndex); + } + + if (i < numOfSubQueries) { + tscError("%p failed to prepare subquery structure and launch subqueries", pSql); + pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY; + + tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, numOfSubQueries); + doCleanupSubqueries(pSql, i, pState); + return pRes->code; // free all allocated resource + } + + if (pRes->code == TSDB_CODE_QUERY_CANCELLED) { + tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, numOfSubQueries); + doCleanupSubqueries(pSql, i, pState); + return pRes->code; + } + + for(int32_t j = 0; j < numOfSubQueries; ++j) { + SSqlObj* pSub = pSql->pSubs[j]; + SRetrieveSupport* pSupport = pSub->param; + + tscTrace("%p sub:%p launch subquery, orderOfSub:%d.", pSql, pSub, pSupport->subqueryIndex); + tscProcessSql(pSub); } return TSDB_CODE_SUCCESS; @@ -1032,10 +1045,13 @@ static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSq int32_t subqueryIndex = trsupport->subqueryIndex; assert(pSql != NULL); + SSubqueryState* pState = trsupport->pState; + assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 && + pPObj->numOfSubs == pState->numOfTotal); /* retrieved in subquery failed. OR query cancelled in retrieve phase. */ - if (trsupport->pState->code == TSDB_CODE_SUCCESS && pPObj->res.code != TSDB_CODE_SUCCESS) { - trsupport->pState->code = -(int)pPObj->res.code; + if (pState->code == TSDB_CODE_SUCCESS && pPObj->res.code != TSDB_CODE_SUCCESS) { + pState->code = -(int)pPObj->res.code; /* * kill current sub-query connection, which may retrieve data from vnodes; @@ -1044,15 +1060,15 @@ static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSq pSql->res.numOfRows = 0; trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; // disable retry efforts tscTrace("%p query is cancelled, sub:%p, orderOfSub:%d abort retrieve, code:%d", trsupport->pParentSqlObj, pSql, - subqueryIndex, trsupport->pState->code); + subqueryIndex, pState->code); } if (numOfRows >= 0) { // current query is successful, but other sub query failed, still abort current query. tscTrace("%p sub:%p retrieve numOfRows:%d,orderOfSub:%d", pPObj, pSql, numOfRows, subqueryIndex); tscError("%p sub:%p abort further retrieval due to other queries failure,orderOfSub:%d,code:%d", pPObj, pSql, - subqueryIndex, trsupport->pState->code); + subqueryIndex, pState->code); } else { - if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && trsupport->pState->code == TSDB_CODE_SUCCESS) { + if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && pState->code == TSDB_CODE_SUCCESS) { /* * current query failed, and the retry count is less than the available * count, retry query clear previous retrieved data, then launch a new sub query @@ -1071,7 +1087,7 @@ static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSq tscError("%p sub:%p failed to create new subquery sqlobj due to out of memory, abort retry", trsupport->pParentSqlObj, pSql); - trsupport->pState->code = TSDB_CODE_CLI_OUT_OF_MEMORY; + pState->code = TSDB_CODE_CLI_OUT_OF_MEMORY; trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; return; } @@ -1079,24 +1095,26 @@ static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSq tscProcessSql(pNew); return; } else { // reach the maximum retry count, abort - atomic_val_compare_exchange_32(&trsupport->pState->code, TSDB_CODE_SUCCESS, numOfRows); + atomic_val_compare_exchange_32(&pState->code, TSDB_CODE_SUCCESS, numOfRows); tscError("%p sub:%p retrieve failed,code:%d,orderOfSub:%d failed.no more retry,set global code:%d", pPObj, pSql, - numOfRows, subqueryIndex, trsupport->pState->code); + numOfRows, subqueryIndex, pState->code); } } - if (atomic_add_fetch_32(&trsupport->pState->numOfCompleted, 1) < trsupport->pState->numOfTotal) { + int32_t finished = atomic_add_fetch_32(&pState->numOfCompleted, 1); + if (finished < pState->numOfTotal) { + tscTrace("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pPObj, pSql, trsupport->subqueryIndex, finished); return tscFreeSubSqlObj(trsupport, pSql); } // all subqueries are failed - tscError("%p retrieve from %d vnode(s) completed,code:%d.FAILED.", pPObj, trsupport->pState->numOfTotal, - trsupport->pState->code); - pPObj->res.code = -(trsupport->pState->code); + tscError("%p retrieve from %d vnode(s) completed,code:%d.FAILED.", pPObj, pState->numOfTotal, + pState->code); + pPObj->res.code = -(pState->code); // release allocated resource tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, trsupport->pFinalColModel, - trsupport->pState->numOfTotal); + pState->numOfTotal); tfree(trsupport->pState); tscFreeSubSqlObj(trsupport, pSql); @@ -1134,10 +1152,14 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) { return; } + SSubqueryState* pState = trsupport->pState; + assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 && + pPObj->numOfSubs == pState->numOfTotal); + // query process and cancel query process may execute at the same time pthread_mutex_lock(&trsupport->queryMutex); - if (numOfRows < 0 || trsupport->pState->code < 0 || pPObj->res.code != TSDB_CODE_SUCCESS) { + if (numOfRows < 0 || pState->code < 0 || pPObj->res.code != TSDB_CODE_SUCCESS) { return tscHandleSubRetrievalError(trsupport, pSql, numOfRows); } @@ -1150,10 +1172,10 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) { if (numOfRows > 0) { assert(pRes->numOfRows == numOfRows); - atomic_add_fetch_64(&trsupport->pState->numOfRetrievedRows, numOfRows); + atomic_add_fetch_64(&pState->numOfRetrievedRows, numOfRows); tscTrace("%p sub:%p retrieve numOfRows:%d totalNumOfRows:%d from ip:%u,vid:%d,orderOfSub:%d", pPObj, pSql, - pRes->numOfRows, trsupport->pState->numOfRetrievedRows, pSvd->ip, pSvd->vnode, idx); + pRes->numOfRows, pState->numOfRetrievedRows, pSvd->ip, pSvd->vnode, idx); #ifdef _DEBUG_VIEW printf("received data from vnode: %d rows\n", pRes->numOfRows); @@ -1208,8 +1230,10 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) { /* set no disk space error info, and abort retry */ return tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE); } - - if (atomic_add_fetch_32(&trsupport->pState->numOfCompleted, 1) < trsupport->pState->numOfTotal) { + + int32_t finished = atomic_add_fetch_32(&pState->numOfCompleted, 1); + if (finished < pState->numOfTotal) { + tscTrace("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pPObj, pSql, trsupport->subqueryIndex, finished); return tscFreeSubSqlObj(trsupport, pSql); } @@ -1217,10 +1241,10 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) { pDesc->pSchema->maxCapacity = trsupport->pExtMemBuffer[idx]->numOfElemsPerPage; tscTrace("%p retrieve from %d vnodes completed.final NumOfRows:%d,start to build loser tree", pPObj, - trsupport->pState->numOfTotal, trsupport->pState->numOfCompleted); + pState->numOfTotal, pState->numOfCompleted); tscClearInterpInfo(&pPObj->cmd); - tscCreateLocalReducer(trsupport->pExtMemBuffer, trsupport->pState->numOfTotal, pDesc, trsupport->pFinalColModel, + tscCreateLocalReducer(trsupport->pExtMemBuffer, pState->numOfTotal, pDesc, trsupport->pFinalColModel, &pPObj->cmd, &pPObj->res); tscTrace("%p build loser tree completed", pPObj); @@ -1229,7 +1253,8 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) { pPObj->res.row = 0; // only free once - free(trsupport->pState); + tfree(trsupport->pState); + tscFreeSubSqlObj(trsupport, pSql); if (pPObj->fp == NULL) { @@ -1322,13 +1347,17 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { pSvd = &vnodeInfo->vpeerDesc[vnodeInfo->index]; } - if (trsupport->pParentSqlObj->res.code != TSDB_CODE_SUCCESS || trsupport->pState->code != TSDB_CODE_SUCCESS) { + SSubqueryState* pState = trsupport->pState; + assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 && + trsupport->pParentSqlObj->numOfSubs == pState->numOfTotal); + + if (trsupport->pParentSqlObj->res.code != TSDB_CODE_SUCCESS || pState->code != TSDB_CODE_SUCCESS) { // metric query is killed, Note: code must be less than 0 trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; if (trsupport->pParentSqlObj->res.code != TSDB_CODE_SUCCESS) { code = -(int)(trsupport->pParentSqlObj->res.code); } else { - code = trsupport->pState->code; + code = pState->code; } tscTrace("%p query cancelled or failed, sub:%p, orderOfSub:%d abort, code:%d", trsupport->pParentSqlObj, pSql, trsupport->subqueryIndex, code); @@ -1344,7 +1373,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { if (code != TSDB_CODE_SUCCESS) { if (trsupport->numOfRetry++ >= MAX_NUM_OF_SUBQUERY_RETRY) { tscTrace("%p sub:%p reach the max retry count,set global code:%d", trsupport->pParentSqlObj, pSql, code); - atomic_val_compare_exchange_32(&trsupport->pState->code, 0, code); + atomic_val_compare_exchange_32(&pState->code, 0, code); } else { // does not reach the maximum retry count, go on tscTrace("%p sub:%p failed code:%d, retry:%d", trsupport->pParentSqlObj, pSql, code, trsupport->numOfRetry); @@ -1353,7 +1382,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { tscError("%p sub:%p failed to create new subquery due to out of memory, abort retry, vid:%d, orderOfSub:%d", trsupport->pParentSqlObj, pSql, pSvd->vnode, trsupport->subqueryIndex); - trsupport->pState->code = -TSDB_CODE_CLI_OUT_OF_MEMORY; + pState->code = -TSDB_CODE_CLI_OUT_OF_MEMORY; trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; } else { assert(pNew->cmd.pMeterInfo[0]->pMeterMeta != NULL && pNew->cmd.pMeterInfo[0]->pMetricMeta != NULL); @@ -1363,17 +1392,17 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { } } - if (trsupport->pState->code != TSDB_CODE_SUCCESS) { // failed, abort + if (pState->code != TSDB_CODE_SUCCESS) { // failed, abort if (vnodeInfo != NULL) { tscTrace("%p sub:%p query failed,ip:%u,vid:%d,orderOfSub:%d,global code:%d", trsupport->pParentSqlObj, pSql, vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vnode, - trsupport->subqueryIndex, trsupport->pState->code); + trsupport->subqueryIndex, pState->code); } else { tscTrace("%p sub:%p query failed,orderOfSub:%d,global code:%d", trsupport->pParentSqlObj, pSql, - trsupport->subqueryIndex, trsupport->pState->code); + trsupport->subqueryIndex, pState->code); } - tscRetrieveFromVnodeCallBack(param, tres, trsupport->pState->code); + tscRetrieveFromVnodeCallBack(param, tres, pState->code); } else { // success, proceed to retrieve data from dnode tscTrace("%p sub:%p query complete,ip:%u,vid:%d,orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql, vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vnode, -- GitLab