From 5dc2c5cdffdfe2e336369ab0977cd0e81839aee4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 18 Jun 2020 15:43:15 +0800 Subject: [PATCH] [td-225] refactor subquery codes --- src/client/inc/tscLocalMerge.h | 25 +- src/client/inc/tscSubquery.h | 2 - src/client/inc/tscUtil.h | 3 +- src/client/src/tscSubquery.c | 737 +++++++++++++++++---------------- src/query/src/qExecutor.c | 1 - src/util/src/terror.c | 1 - 6 files changed, 393 insertions(+), 376 deletions(-) diff --git a/src/client/inc/tscLocalMerge.h b/src/client/inc/tscLocalMerge.h index 19230e34f1..c073f40546 100644 --- a/src/client/inc/tscLocalMerge.h +++ b/src/client/inc/tscLocalMerge.h @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#ifndef TDENGINE_TSCSECONARYMERGE_H -#define TDENGINE_TSCSECONARYMERGE_H +#ifndef TDENGINE_TSCLOCALMERGE_H +#define TDENGINE_TSCLOCALMERGE_H #ifdef __cplusplus extern "C" { @@ -27,14 +27,7 @@ extern "C" { #include "tsclient.h" #define MAX_NUM_OF_SUBQUERY_RETRY 3 - -/* - * @version 0.1 - * @date 2018/01/05 - * @author liaohj - * management of client-side reducer for metric query - */ - + struct SQLFunctionCtx; typedef struct SLocalDataSource { @@ -60,7 +53,6 @@ typedef struct SLocalReducer { char * prevRowOfInput; tFilePage * pResultBuf; int32_t nResultBufSize; -// char * pBufForInterpo; // intermediate buffer for interpolation tFilePage * pTempBuffer; struct SQLFunctionCtx *pCtx; int32_t rowSize; // size of each intermediate result. @@ -81,13 +73,8 @@ typedef struct SLocalReducer { } SLocalReducer; typedef struct SSubqueryState { - /* - * the number of completed retrieval subquery, once this value equals to numOfVnodes, - * all retrieval are completed.Local merge is launched. - */ - int32_t numOfCompleted; - int32_t numOfTotal; // number of total sub-queries - int32_t code; // code from subqueries + int32_t numOfRemain; // the number of remain unfinished subquery + int32_t numOfTotal; // the number of total sub-queries uint64_t numOfRetrievedRows; // total number of points in this query } SSubqueryState; @@ -128,4 +115,4 @@ int32_t tscDoLocalMerge(SSqlObj *pSql); } #endif -#endif // TDENGINE_TSCSECONARYMERGE_H +#endif // TDENGINE_TSCLOCALMERGE_H diff --git a/src/client/inc/tscSubquery.h b/src/client/inc/tscSubquery.h index 368fe2250a..82d490376a 100644 --- a/src/client/inc/tscSubquery.h +++ b/src/client/inc/tscSubquery.h @@ -26,11 +26,9 @@ extern "C" { void tscFetchDatablockFromSubquery(SSqlObj* pSql); void tscSetupOutputColumnIndex(SSqlObj* pSql); -int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql); void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code); SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, int32_t index); -void tscDestroyJoinSupporter(SJoinSupporter* pSupporter); int32_t tscHandleMasterJoinQuery(SSqlObj* pSql); diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 581ce00a62..d6562f008d 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -64,7 +64,8 @@ typedef struct SJoinSupporter { SSubqueryState* pState; SSqlObj* pObj; // parent SqlObj int32_t subqueryIndex; // index of sub query - int64_t interval; // interval time + int64_t intervalTime; // interval time + int64_t slidingTime; // sliding time SLimitVal limit; // limit info uint64_t uid; // query meter uid SArray* colList; // previous query information, no need to use this attribute, and the corresponding attribution diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 4e02325be6..d25afcf00f 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -12,12 +12,12 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ +#include "os.h" #include "tscSubquery.h" -#include -#include -#include -#include "os.h" +#include "qast.h" +#include "tcompare.h" +#include "tschemautil.h" #include "qtsbuf.h" #include "tscLog.h" #include "tsclient.h" @@ -169,7 +169,8 @@ SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, in pSupporter->subqueryIndex = index; SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); - pSupporter->interval = pQueryInfo->intervalTime; + pSupporter->intervalTime = pQueryInfo->intervalTime; + pSupporter->slidingTime = pQueryInfo->slidingTime; pSupporter->limit = pQueryInfo->limit; STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, index); @@ -186,7 +187,7 @@ SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, in return pSupporter; } -void tscDestroyJoinSupporter(SJoinSupporter* pSupporter) { +static void tscDestroyJoinSupporter(SJoinSupporter* pSupporter) { if (pSupporter == NULL) { return; } @@ -217,7 +218,7 @@ void tscDestroyJoinSupporter(SJoinSupporter* pSupporter) { * primary timestamp column , the secondary query is not necessary * */ -bool needSecondaryQuery(SQueryInfo* pQueryInfo) { +static UNUSED_FUNC bool needSecondaryQuery(SQueryInfo* pQueryInfo) { size_t numOfCols = taosArrayGetSize(pQueryInfo->colList); for (int32_t i = 0; i < numOfCols; ++i) { @@ -233,14 +234,11 @@ bool needSecondaryQuery(SQueryInfo* pQueryInfo) { /* * launch secondary stage query to fetch the result that contains timestamp in set */ -int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) { +static int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) { int32_t numOfSub = 0; SJoinSupporter* pSupporter = NULL; - /* - * If the columns are not involved in the final select clause, - * the corresponding query will not be issued. - */ + //If the columns are not involved in the final select clause, the corresponding query will not be issued. for (int32_t i = 0; i < pSql->numOfSubs; ++i) { pSupporter = pSql->pSubs[i]->param; if (taosArrayGetSize(pSupporter->exprList) > 0) { @@ -251,13 +249,12 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) { assert(numOfSub > 0); // scan all subquery, if one sub query has only ts, ignore it - tscTrace("%p start to launch secondary subquery, total:%d, only:%d needs to query, others are not retrieve in " - "select clause", pSql, pSql->numOfSubs, numOfSub); + tscTrace("%p start to launch secondary subquery, total:%d, only:%d needs to query", pSql, pSql->numOfSubs, numOfSub); //the subqueries that do not actually launch the secondary query to virtual node is set as completed. SSubqueryState* pState = pSupporter->pState; pState->numOfTotal = pSql->numOfSubs; - pState->numOfCompleted = (pSql->numOfSubs - numOfSub); + pState->numOfRemain = numOfSub; bool success = true; @@ -301,7 +298,8 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) { // set the second stage sub query for join process TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE); - pQueryInfo->intervalTime = pSupporter->interval; + pQueryInfo->intervalTime = pSupporter->intervalTime; + pQueryInfo->slidingTime = pSupporter->slidingTime; pQueryInfo->groupbyExpr = pSupporter->groupbyExpr; tscTagCondCopy(&pQueryInfo->tagCond, &pSupporter->tagCond); @@ -375,6 +373,7 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) { if (pSql->pSubs[i] == NULL) { continue; } + tscDoQuery(pSql->pSubs[i]); } @@ -405,11 +404,9 @@ void freeJoinSubqueryObj(SSqlObj* pSql) { } static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSupporter* pSupporter) { - int32_t numOfTotal = pSupporter->pState->numOfTotal; - int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1); - - pSqlObj->res.code = pSupporter->pState->code; - if (finished >= numOfTotal) { + assert(pSupporter->pState->numOfRemain > 0); + + if (atomic_sub_fetch_32(&pSupporter->pState->numOfRemain, 1) <= 0) { tscError("%p all subquery return and query failed, global code:%d", pSqlObj, pSqlObj->res.code); freeJoinSubqueryObj(pSqlObj); } @@ -421,7 +418,7 @@ static void updateQueryTimeRange(SQueryInfo* pQueryInfo, STimeWindow* win) { pQueryInfo->window = *win; } -static void tSIntersectionAndLaunchSecQuery(SJoinSupporter* pSupporter, SSqlObj* pSql) { +static UNUSED_FUNC void tSIntersectionAndLaunchSecQuery(SJoinSupporter* pSupporter, SSqlObj* pSql) { SSqlObj* pParentSql = pSupporter->pObj; SQueryInfo* pParentQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, pParentSql->cmd.clauseIndex); @@ -443,22 +440,7 @@ static void tSIntersectionAndLaunchSecQuery(SJoinSupporter* pSupporter, SSqlObj* // return; // } // } - - int32_t numOfTotal = pSupporter->pState->numOfTotal; - int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1); - - if (finished >= numOfTotal) { - assert(finished == numOfTotal); - - if (pSupporter->pState->code != TSDB_CODE_SUCCESS) { - tscTrace("%p sub:%p, numOfSub:%d, quit from further procedure due to other queries failure", pParentSql, pSql, - pSupporter->subqueryIndex); - freeJoinSubqueryObj(pParentSql); - return; - } - - tscTrace("%p all subqueries retrieve ts complete, do ts block intersect", pParentSql); - + SJoinSupporter* p1 = pParentSql->pSubs[0]->param; SJoinSupporter* p2 = pParentSql->pSubs[1]->param; @@ -471,7 +453,6 @@ static void tSIntersectionAndLaunchSecQuery(SJoinSupporter* pSupporter, SSqlObj* updateQueryTimeRange(pParentQueryInfo, &win); tscLaunchSecondPhaseSubqueries(pParentSql); } - } } int32_t tscCompareTidTags(const void* p1, const void* p2) { @@ -545,11 +526,13 @@ static void issueTSCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj* tscAddSpecialColumnForSelect(pQueryInfo, 0, TSDB_FUNC_TS_COMP, &index, &colSchema, TSDB_COL_NORMAL); // set the tags value for ts_comp function - SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, 0); - int16_t tagColId = tscGetJoinTagColIdByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->uid); - pExpr->param->i64Key = tagColId; - pExpr->numOfParams = 1; - + if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { + SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, 0); + int16_t tagColId = tscGetJoinTagColIdByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->uid); + pExpr->param->i64Key = tagColId; + pExpr->numOfParams = 1; + } + // add the filter tag column if (pSupporter->colList != NULL) { size_t s = taosArrayGetSize(pSupporter->colList); @@ -575,7 +558,7 @@ static void issueTSCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj* tscProcessSql(pSql); } -static bool checkForIdenticalTagVal(SQueryInfo* pQueryInfo, SJoinSupporter* p1, void* pSql) { +static bool checkForDuplicateTagVal(SQueryInfo* pQueryInfo, SJoinSupporter* p1, SSqlObj* pPSqlObj) { STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); SSchema* pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);// todo: tags mismatch, tags not completed @@ -587,8 +570,8 @@ static bool checkForIdenticalTagVal(SQueryInfo* pQueryInfo, SJoinSupporter* p1, STidTags* p = (STidTags*) varDataVal(p1->pIdTagList + i * p1->tagSize); if (doCompare(prev->tag, p->tag, pColSchema->type, pColSchema->bytes) == 0) { - tscError("%p join tags have same value for different table, free all sub SqlObj and quit", pSql); - p1->pState->code = TSDB_CODE_QRY_DUP_JOIN_KEY; + tscError("%p join tags have same value for different table, free all sub SqlObj and quit", pPSqlObj); + pPSqlObj->res.code = TSDB_CODE_QRY_DUP_JOIN_KEY; return false; } } @@ -596,8 +579,8 @@ static bool checkForIdenticalTagVal(SQueryInfo* pQueryInfo, SJoinSupporter* p1, return true; } -static void getIntersectionOfTagVal(SQueryInfo* pQueryInfo, SSqlObj* pParentSql, SArray** s1, SArray** s2) { - tscTrace("%p all subqueries retrieve tags complete, do tags match", pParentSql); +static void getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pParentSql, SArray** s1, SArray** s2) { + tscTrace("%p all subqueries retrieve complete, do tags match", pParentSql); SJoinSupporter* p1 = pParentSql->pSubs[0]->param; SJoinSupporter* p2 = pParentSql->pSubs[1]->param; @@ -613,7 +596,7 @@ static void getIntersectionOfTagVal(SQueryInfo* pQueryInfo, SSqlObj* pParentSql, *s1 = taosArrayInit(p1->num, p1->tagSize); *s2 = taosArrayInit(p2->num, p2->tagSize); - if (!(checkForIdenticalTagVal(pQueryInfo, p1, pParentSql) && checkForIdenticalTagVal(pQueryInfo, p2, pParentSql))) { + if (!(checkForDuplicateTagVal(pQueryInfo, p1, pParentSql) && checkForDuplicateTagVal(pQueryInfo, p2, pParentSql))) { freeJoinSubqueryObj(pParentSql); pParentSql->res.code = TSDB_CODE_QRY_DUP_JOIN_KEY; tscQueueAsyncRes(pParentSql); @@ -642,7 +625,7 @@ static void getIntersectionOfTagVal(SQueryInfo* pQueryInfo, SSqlObj* pParentSql, } } -static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { +static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRows) { SJoinSupporter* pSupporter = (SJoinSupporter*)param; SSqlObj* pParentSql = pSupporter->pObj; @@ -652,239 +635,287 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { SSqlRes* pRes = &pSql->res; SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + assert(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY)); - // response of tag retrieve - if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) { - // todo handle error + // check for the error code firstly + if (taos_errno(pSql) != TSDB_CODE_SUCCESS) { + // todo retry if other subqueries are not failed - if (numOfRows == 0 || pRes->completed) { - if (numOfRows > 0) { - size_t validLen = pSupporter->tagSize * pRes->numOfRows; + assert(numOfRows < 0 && numOfRows == taos_errno(pSql)); + tscError("%p sub query failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex); - size_t length = pSupporter->totalLen + validLen; - char* tmp = realloc(pSupporter->pIdTagList, length); - assert(tmp != NULL); - pSupporter->pIdTagList = tmp; + pParentSql->res.code = numOfRows; + quitAllSubquery(pParentSql, pSupporter); - memcpy(pSupporter->pIdTagList + pSupporter->totalLen,pRes->data, validLen); - pSupporter->totalLen += validLen; - pSupporter->num += pRes->numOfRows; - } + tscQueueAsyncRes(pParentSql); + return; + } - // tuples have been retrieved to client, try tuples from the next vnode - if (hasMoreVnodesToTry(pSql)) { - STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + // keep the results in memory + if (numOfRows > 0) { + size_t validLen = pSupporter->tagSize * pRes->numOfRows; + size_t length = pSupporter->totalLen + validLen; - int32_t totalVgroups = pTableMetaInfo->vgroupList->numOfVgroups; - pTableMetaInfo->vgroupIndex += 1; - assert(pTableMetaInfo->vgroupIndex < totalVgroups); + // todo handle memory error + char* tmp = realloc(pSupporter->pIdTagList, length); + if (tmp == NULL) { + tscError("%p failed to malloc memory", pSql); - tscTrace("%p tid_tag from vgroup index:%d completed, try next vgroup:%d. total vgroups:%d. current numOfRes:%d", - pSql, pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVgroups, - pSupporter->num); + pParentSql->res.code = TAOS_SYSTEM_ERROR(errno); + quitAllSubquery(pParentSql, pSupporter); - pCmd->command = TSDB_SQL_SELECT; - tscResetForNextRetrieve(&pSql->res); + tscQueueAsyncRes(pParentSql); + return; + } - // set the callback function - pSql->fp = tscJoinQueryCallback; - tscProcessSql(pSql); - return; - } + pSupporter->pIdTagList = tmp; - int32_t numOfTotal = pSupporter->pState->numOfTotal; - int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1); - if (finished < numOfTotal) { - return; - } + memcpy(pSupporter->pIdTagList + pSupporter->totalLen, pRes->data, validLen); + pSupporter->totalLen += validLen; + pSupporter->num += pRes->numOfRows; - // all subquery are returned, start to compare the tags - assert(finished == numOfTotal); + // query not completed, continue to retrieve tid + tag tuples + if (!pRes->completed) { + taos_fetch_rows_a(tres, tidTagRetrieveCallback, param); + return; + } + } - SArray *s1 = NULL, *s2 = NULL; - getIntersectionOfTagVal(pQueryInfo, pParentSql, &s1, &s2); + // data in current vnode has all returned to client, try next vnode if exits + // tuples have been retrieved to client, try tuples from the next vnode + if (hasMoreVnodesToTry(pSql)) { + STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - if (taosArrayGetSize(s1) == 0 || taosArrayGetSize(s2) == 0) { // no results,return. - tscTrace("%p free all sub SqlObj and quit", pParentSql); - freeJoinSubqueryObj(pParentSql); - return; - } else { - SSqlCmd* pSubCmd1 = &pParentSql->pSubs[0]->cmd; - SSqlCmd* pSubCmd2 = &pParentSql->pSubs[1]->cmd; + int32_t totalVgroups = pTableMetaInfo->vgroupList->numOfVgroups; + pTableMetaInfo->vgroupIndex += 1; + assert(pTableMetaInfo->vgroupIndex < totalVgroups); - SQueryInfo* pQueryInfo1 = tscGetQueryInfoDetail(pSubCmd1, 0); - STableMetaInfo* pTableMetaInfo1 = tscGetMetaInfo(pQueryInfo1, 0); - tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo1, s1); + tscTrace("%p tid_tag from vgroup index:%d completed, try next vgroup:%d. total vgroups:%d. current numOfRes:%d", + pSql, pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVgroups, pSupporter->num); - SQueryInfo* pQueryInfo2 = tscGetQueryInfoDetail(pSubCmd2, 0); - STableMetaInfo* pTableMetaInfo2 = tscGetMetaInfo(pQueryInfo2, 0); - tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo2, s2); + pCmd->command = TSDB_SQL_SELECT; + tscResetForNextRetrieve(&pSql->res); - pSupporter->pState->numOfCompleted = 0; - pSupporter->pState->code = 0; - pSupporter->pState->numOfTotal = 2; + // set the callback function + pSql->fp = tscJoinQueryCallback; + tscProcessSql(pSql); + return; + } - for (int32_t m = 0; m < pParentSql->numOfSubs; ++m) { - SSqlObj* psub = pParentSql->pSubs[m]; - issueTSCompQuery(psub, psub->param, pParentSql); - } - } + // no data exists in next vnode, mark the query completed + // only when there is no subquery exits any more, proceeds to get the intersect of the tuple sets. + if (atomic_sub_fetch_32(&pSupporter->pState->numOfRemain, 1) > 0) { + return; + } - } else { - if (numOfRows < 0) { // error - pSupporter->pState->code = numOfRows; - quitAllSubquery(pParentSql, pSupporter); + SArray *s1 = NULL, *s2 = NULL; + getIntersectionOfTableTuple(pQueryInfo, pParentSql, &s1, &s2); + if (taosArrayGetSize(s1) == 0 || taosArrayGetSize(s2) == 0) { // no results,return. + tscTrace("%p free all sub SqlObj and quit", pParentSql); + freeJoinSubqueryObj(pParentSql); + return; + } - pParentSql->res.code = numOfRows; - tscQueueAsyncRes(pParentSql); - return; - } + // proceed to for ts_comp query + SSqlCmd* pSubCmd1 = &pParentSql->pSubs[0]->cmd; + SSqlCmd* pSubCmd2 = &pParentSql->pSubs[1]->cmd; - size_t length = pSupporter->totalLen + pRes->rspLen; - assert(length > 0); + SQueryInfo* pQueryInfo1 = tscGetQueryInfoDetail(pSubCmd1, 0); + STableMetaInfo* pTableMetaInfo1 = tscGetMetaInfo(pQueryInfo1, 0); + tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo1, s1); - char* tmp = realloc(pSupporter->pIdTagList, length); - assert(tmp != NULL); + SQueryInfo* pQueryInfo2 = tscGetQueryInfoDetail(pSubCmd2, 0); + STableMetaInfo* pTableMetaInfo2 = tscGetMetaInfo(pQueryInfo2, 0); + tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo2, s2); - pSupporter->pIdTagList = tmp; + pSupporter->pState->numOfTotal = 2; + pSupporter->pState->numOfRemain = pSupporter->pState->numOfTotal; - memcpy(pSupporter->pIdTagList, pRes->data, pRes->rspLen); - pSupporter->totalLen += pRes->rspLen; - pSupporter->num += pRes->numOfRows; + for (int32_t m = 0; m < pParentSql->numOfSubs; ++m) { + SSqlObj* sub = pParentSql->pSubs[m]; + issueTSCompQuery(sub, sub->param, pParentSql); + } +} - // continue retrieve data from vnode - taos_fetch_rows_a(tres, joinRetrieveCallback, param); - } +static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRows) { + SJoinSupporter* pSupporter = (SJoinSupporter*)param; + SSqlObj* pParentSql = pSupporter->pObj; + + SSqlObj* pSql = (SSqlObj*)tres; + SSqlCmd* pCmd = &pSql->cmd; + SSqlRes* pRes = &pSql->res; + + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + assert(!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)); + + // check for the error code firstly + if (taos_errno(pSql) != TSDB_CODE_SUCCESS) { + // todo retry if other subqueries are not failed yet + assert(numOfRows < 0 && numOfRows == taos_errno(pSql)); + tscError("%p sub query failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex); + + pParentSql->res.code = numOfRows; + quitAllSubquery(pParentSql, pSupporter); + + tscQueueAsyncRes(pParentSql); return; } - if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) { - if (numOfRows < 0) { - tscError("%p sub query failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex); - pSupporter->pState->code = numOfRows; - quitAllSubquery(pParentSql, pSupporter); + if (numOfRows > 0) { // write the compressed timestamp to disk file + fwrite(pRes->data, pRes->numOfRows, 1, pSupporter->f); + fclose(pSupporter->f); + pSupporter->f = NULL; + + STSBuf* pBuf = tsBufCreateFromFile(pSupporter->path, true); + if (pBuf == NULL) { // in error process, close the fd + tscError("%p invalid ts comp file from vnode, abort subquery, file size:%d", pSql, numOfRows); + + pParentSql->res.code = TAOS_SYSTEM_ERROR(errno); + tscQueueAsyncRes(pParentSql); + return; } - if (numOfRows > 0) { // write the compressed timestamp to disk file - fwrite(pRes->data, pRes->numOfRows, 1, pSupporter->f); - fclose(pSupporter->f); - pSupporter->f = NULL; - - STSBuf* pBuf = tsBufCreateFromFile(pSupporter->path, true); - if (pBuf == NULL) { // in error process, close the fd - tscError("%p invalid ts comp file from vnode, abort subquery, file size:%d", pSql, numOfRows); + if (pSupporter->pTSBuf == NULL) { + tscTrace("%p create tmp file for ts block:%s, size:%d bytes", pSql, pBuf->path, numOfRows); + pSupporter->pTSBuf = pBuf; + } else { + assert(pQueryInfo->numOfTables == 1); // for subquery, only one + STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - pSupporter->pState->code = TSDB_CODE_TSC_APP_ERROR; // todo set the informative code - quitAllSubquery(pParentSql, pSupporter); - return; - } + tsBufMerge(pSupporter->pTSBuf, pBuf, pTableMetaInfo->vgroupIndex); + tsBufDestory(pBuf); + } - if (pSupporter->pTSBuf == NULL) { - tscTrace("%p create tmp file for ts block:%s, size:%d bytes", pSql, pBuf->path, numOfRows); - pSupporter->pTSBuf = pBuf; - } else { - assert(pQueryInfo->numOfTables == 1); // for subquery, only one - STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + // continue to retrieve ts-comp data from vnode + if (!pRes->completed) { + getTmpfilePath("ts-join", pSupporter->path); + pSupporter->f = fopen(pSupporter->path, "w"); + pRes->row = pRes->numOfRows; - tsBufMerge(pSupporter->pTSBuf, pBuf, pTableMetaInfo->vgroupIndex); - tsBufDestory(pBuf); - } + taos_fetch_rows_a(tres, tsCompRetrieveCallback, param); + return; } + } - if (pRes->completed) { - if (hasMoreVnodesToTry(pSql)) { - STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + if (hasMoreVnodesToTry(pSql)) { + STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - int32_t totalVgroups = pTableMetaInfo->vgroupList->numOfVgroups; - pTableMetaInfo->vgroupIndex += 1; - assert(pTableMetaInfo->vgroupIndex < totalVgroups); + int32_t totalVgroups = pTableMetaInfo->vgroupList->numOfVgroups; + pTableMetaInfo->vgroupIndex += 1; + assert(pTableMetaInfo->vgroupIndex < totalVgroups); - tscTrace("%p results from vgroup index:%d completed, try next vgroup:%d. total vgroups:%d. current numOfRes:%d", - pSql, pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVgroups, - pRes->numOfClauseTotal); + tscTrace("%p results from vgroup index:%d completed, try next vgroup:%d. total vgroups:%d. current numOfRes:%d", + pSql, pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVgroups, + pRes->numOfClauseTotal); - pCmd->command = TSDB_SQL_SELECT; - tscResetForNextRetrieve(&pSql->res); + pCmd->command = TSDB_SQL_SELECT; + tscResetForNextRetrieve(&pSql->res); - assert(pSupporter->f == NULL); - getTmpfilePath("ts-join", pSupporter->path); - pSupporter->f = fopen(pSupporter->path, "w"); - pRes->row = pRes->numOfRows; + assert(pSupporter->f == NULL); + getTmpfilePath("ts-join", pSupporter->path); + + // TODO check for failure + pSupporter->f = fopen(pSupporter->path, "w"); + pRes->row = pRes->numOfRows; - // set the callback function - pSql->fp = tscJoinQueryCallback; - tscProcessSql(pSql); - return; - } else { - tSIntersectionAndLaunchSecQuery(pSupporter, pSql); - } + // set the callback function + pSql->fp = tscJoinQueryCallback; + tscProcessSql(pSql); + return; + } - } else { // open a new file to save the incoming result - getTmpfilePath("ts-join", pSupporter->path); - pSupporter->f = fopen(pSupporter->path, "w"); - pRes->row = pRes->numOfRows; + if (atomic_sub_fetch_32(&pSupporter->pState->numOfRemain, 1) > 0) { + return; + } - taos_fetch_rows_a(tres, joinRetrieveCallback, param); - } - } else { // secondary stage retrieve, driven by taos_fetch_row or other functions - if (numOfRows < 0) { - pSupporter->pState->code = numOfRows; - tscError("%p retrieve failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex); - } + tscTrace("%p all subquery retrieve ts complete, do ts block intersect", pParentSql); - if (numOfRows >= 0) { - pRes->numOfTotal += pRes->numOfRows; - } + // proceeds to launched secondary query to retrieve final data + SJoinSupporter* p1 = pParentSql->pSubs[0]->param; + SJoinSupporter* p2 = pParentSql->pSubs[1]->param; - if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && numOfRows == 0) { - STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - assert(pQueryInfo->numOfTables == 1); + STimeWindow win = TSWINDOW_INITIALIZER; + int64_t num = doTSBlockIntersect(pParentSql, p1, p2, &win); + if (num <= 0) { // no result during ts intersect + tscTrace("%p no results generated in ts intersection, free all sub SqlObj and quit", pParentSql); + freeJoinSubqueryObj(pParentSql); + + return; + } + + // launch the query the retrieve actual results from vnode along with the filtered timestamp + SQueryInfo* pPQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, pParentSql->cmd.clauseIndex); + updateQueryTimeRange(pPQueryInfo, &win); + tscLaunchSecondPhaseSubqueries(pParentSql); +} - // for projection query, need to try next vnode if current vnode is exhausted - if ((++pTableMetaInfo->vgroupIndex) < pTableMetaInfo->vgroupList->numOfVgroups) { - pSupporter->pState->numOfCompleted = 0; - pSupporter->pState->numOfTotal = 1; +static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfRows) { + SJoinSupporter* pSupporter = (SJoinSupporter*)param; - pSql->cmd.command = TSDB_SQL_SELECT; - pSql->fp = tscJoinQueryCallback; - tscProcessSql(pSql); + SSqlObj* pParentSql = pSupporter->pObj; + SSubqueryState* pState = pSupporter->pState; - return; - } - } + SSqlObj* pSql = (SSqlObj*)tres; + SSqlCmd* pCmd = &pSql->cmd; + SSqlRes* pRes = &pSql->res; - int32_t numOfTotal = pSupporter->pState->numOfTotal; - int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1); + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - if (finished >= numOfTotal) { - assert(finished == numOfTotal); - tscTrace("%p all %d secondary subquery retrieves completed, global code:%d", tres, numOfTotal, - pParentSql->res.code); + // TODO put to async res? + if (taos_errno(pSql) != TSDB_CODE_SUCCESS) { + assert(numOfRows == taos_errno(pSql)); - if (pSupporter->pState->code != TSDB_CODE_SUCCESS) { - pParentSql->res.code = pSupporter->pState->code; - freeJoinSubqueryObj(pParentSql); - pParentSql->res.completed = true; - } + pParentSql->res.code = numOfRows; + tscError("%p retrieve failed, index:%d, code:%s", pSql, pSupporter->subqueryIndex, tstrerror(numOfRows)); + } - // update the records for each subquery in parent sql object. - for (int32_t i = 0; i < pParentSql->numOfSubs; ++i) { - if (pParentSql->pSubs[i] == NULL) { - continue; - } + if (numOfRows >= 0) { + pRes->numOfTotal += pRes->numOfRows; + } - SSqlRes* pRes1 = &pParentSql->pSubs[i]->res; - pRes1->numOfClauseTotal += pRes1->numOfRows; - } + if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && numOfRows == 0) { + STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + assert(pQueryInfo->numOfTables == 1); - // data has retrieved to client, build the join results - tscBuildResFromSubqueries(pParentSql); - } else { - tscTrace("%p sub:%p completed, completed:%d, total:%d", pParentSql, tres, finished, numOfTotal); + // for projection query, need to try next vnode if current vnode is exhausted + if ((++pTableMetaInfo->vgroupIndex) < pTableMetaInfo->vgroupList->numOfVgroups) { + pState->numOfRemain = 1; + pState->numOfTotal = 1; + + pSql->cmd.command = TSDB_SQL_SELECT; + pSql->fp = tscJoinQueryCallback; + tscProcessSql(pSql); + + return; + } + } + + if (atomic_sub_fetch_32(&pState->numOfRemain, 1) > 0) { + tscTrace("%p sub:%p completed, remain:%d, total:%d", pParentSql, tres, pState->numOfRemain, pState->numOfTotal); + return; + } + + tscTrace("%p all %d secondary subqueries retrieval completed, code:%d", tres, pState->numOfTotal, pParentSql->res.code); + + if (pParentSql->res.code != TSDB_CODE_SUCCESS) { + freeJoinSubqueryObj(pParentSql); + pParentSql->res.completed = true; + } + + // update the records for each subquery in parent sql object. + for (int32_t i = 0; i < pParentSql->numOfSubs; ++i) { + if (pParentSql->pSubs[i] == NULL) { + continue; } + + SSqlRes* pRes1 = &pParentSql->pSubs[i]->res; + pRes1->numOfClauseTotal += pRes1->numOfRows; } + + // data has retrieved to client, build the join results + tscBuildResFromSubqueries(pParentSql); } static SJoinSupporter* tscUpdateSubqueryStatus(SSqlObj* pSql, int32_t numOfFetch) { @@ -902,7 +933,7 @@ static SJoinSupporter* tscUpdateSubqueryStatus(SSqlObj* pSql, int32_t numOfFetch } pState->numOfTotal = pSql->numOfSubs; - pState->numOfCompleted = pSql->numOfSubs - numOfFetch; + pState->numOfRemain = numOfFetch; return pSupporter; } @@ -990,7 +1021,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { pSupporter->subqueryIndex, pTableMetaInfo->vgroupIndex); tscResetForNextRetrieve(pRes1); - pSql1->fp = joinRetrieveCallback; + pSql1->fp = joinRetrieveFinalResCallback; if (pCmd1->command < TSDB_SQL_LOCAL) { pCmd1->command = (pCmd1->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; @@ -1008,8 +1039,9 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) { tscTrace("%p all subquery response, retrieve data", pSql); + // the column transfer support struct has been built if (pRes->pColumnIndex != NULL) { - return; // the column transfer support struct has been built + return; } SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); @@ -1045,68 +1077,76 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) { void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { SSqlObj* pSql = (SSqlObj*)tres; - + SJoinSupporter* pSupporter = (SJoinSupporter*)param; - + SSqlObj* pParentSql = pSupporter->pObj; + // There is only one subquery and table for each subquery. SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); assert(pQueryInfo->numOfTables == 1 && pSql->cmd.numOfClause == 1); - - if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) { - if (code != TSDB_CODE_SUCCESS) { // direct call joinRetrieveCallback and set the error code - joinRetrieveCallback(param, pSql, code); - } else { // first stage query, continue to retrieve compressed time stamp data - pSql->fp = joinRetrieveCallback; - pSql->cmd.command = TSDB_SQL_FETCH; - tscProcessSql(pSql); - } - } else { // second stage join subquery - SSqlObj* pParentSql = pSupporter->pObj; + // retrieve actual query results from vnode during the second stage join subquery + if (pParentSql->res.code != TSDB_CODE_SUCCESS) { + tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, code, pParentSql->res.code); + quitAllSubquery(pParentSql, pSupporter); + tscQueueAsyncRes(pParentSql); - if (pSupporter->pState->code != TSDB_CODE_SUCCESS) { - tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, code, - pSupporter->pState->code); - quitAllSubquery(pParentSql, pSupporter); + return; + } - return; - } + // TODO here retry is required, not directly returns to client + if (taos_errno(pSql) != TSDB_CODE_SUCCESS) { + assert(taos_errno(pSql) == code); - if (code != TSDB_CODE_SUCCESS) { - tscError("%p sub query failed, code:%s, set global code:%s, index:%d", pSql, tstrerror(code), tstrerror(code), - pSupporter->subqueryIndex); - - pSupporter->pState->code = code; - quitAllSubquery(pParentSql, pSupporter); + tscError("%p abort query, code:%d, global code:%d", pSql, code, pParentSql->res.code); + pParentSql->res.code = code; + + quitAllSubquery(pParentSql, pSupporter); + tscQueueAsyncRes(pParentSql); + + return; + } + + // retrieve tuples from vnode + if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) { + pSql->fp = tidTagRetrieveCallback; + pSql->cmd.command = TSDB_SQL_FETCH; + tscProcessSql(pSql); + return; + } + + // retrieve ts_comp info from vnode + if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) { + pSql->fp = tsCompRetrieveCallback; + pSql->cmd.command = TSDB_SQL_FETCH; + tscProcessSql(pSql); + return; + } + + // wait for the other subqueries response from vnode + if (atomic_sub_fetch_32(&pSupporter->pState->numOfRemain, 1) > 0) { + return; + } + + tscSetupOutputColumnIndex(pParentSql); + STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + + /** + * if the query is a continue query (vgroupIndex > 0 for projection query) for next vnode, do the retrieval of + * data instead of returning to its invoker + */ + if (pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { + pSupporter->pState->numOfRemain = pSupporter->pState->numOfTotal; // reset the record value + + pSql->fp = joinRetrieveFinalResCallback; // continue retrieve data + pSql->cmd.command = TSDB_SQL_FETCH; + tscProcessSql(pSql); + } else { // first retrieve from vnode during the secondary stage sub-query + // set the command flag must be after the semaphore been correctly set. + if (pParentSql->res.code == TSDB_CODE_SUCCESS) { + (*pParentSql->fp)(pParentSql->param, pParentSql, 0); } else { - int32_t numOfTotal = pSupporter->pState->numOfTotal; - int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1); - - if (finished >= numOfTotal) { - assert(finished == numOfTotal); - - tscSetupOutputColumnIndex(pParentSql); - STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - - /** - * if the query is a continue query (vgroupIndex > 0 for projection query) for next vnode, do the retrieval of - * data instead of returning to its invoker - */ - if (pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { - pSupporter->pState->numOfCompleted = 0; // reset the record value - - pSql->fp = joinRetrieveCallback; // continue retrieve data - pSql->cmd.command = TSDB_SQL_FETCH; - tscProcessSql(pSql); - } else { // first retrieve from vnode during the secondary stage sub-query - // set the command flag must be after the semaphore been correctly set. - if (pParentSql->res.code == TSDB_CODE_SUCCESS) { - (*pParentSql->fp)(pParentSql->param, pParentSql, 0); - } else { - tscQueueAsyncRes(pParentSql); - } - } - } + tscQueueAsyncRes(pParentSql); } } } @@ -1257,14 +1297,15 @@ int32_t tscHandleMasterJoinQuery(SSqlObj* pSql) { SSubqueryState *pState = calloc(1, sizeof(SSubqueryState)); pState->numOfTotal = pQueryInfo->numOfTables; - + pState->numOfRemain = pState->numOfTotal; + tscTrace("%p start launch subquery, total:%d", pSql, pQueryInfo->numOfTables); for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { SJoinSupporter *pSupporter = tscCreateJoinSupporter(pSql, pState, i); if (pSupporter == NULL) { // failed to create support struct, abort current query tscError("%p tableIndex:%d, failed to allocate join support object, abort further query", pSql, i); - pState->numOfCompleted = pQueryInfo->numOfTables - i - 1; + pState->numOfRemain = i; pSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY; return pSql->res.code; @@ -1342,6 +1383,8 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { tscTrace("%p retrieved query data from %d vnode(s)", pSql, pSql->numOfSubs); SSubqueryState *pState = calloc(1, sizeof(SSubqueryState)); pState->numOfTotal = pSql->numOfSubs; + pState->numOfRemain = pSql->numOfSubs; + pRes->code = TSDB_CODE_SUCCESS; int32_t i = 0; @@ -1436,7 +1479,7 @@ static void tscFreeSubSqlObj(SRetrieveSupport *trsupport, SSqlObj *pSql) { static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfRows); static void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows); -static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES *tres, int32_t errCode) { +static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES *tres, int32_t code) { // set no disk space error info #ifdef WINDOWS LPVOID lpMsgBuf; @@ -1448,44 +1491,44 @@ static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES #else tscError("sub:%p failed to flush data to disk:reason:%s", tres, strerror(errno)); #endif - - trsupport->pState->code = -errCode; + + SSqlObj* pParentSql = trsupport->pParentSqlObj; + + pParentSql->res.code = code; trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; pthread_mutex_unlock(&trsupport->queryMutex); - tscHandleSubqueryError(trsupport, tres, trsupport->pState->code); + tscHandleSubqueryError(trsupport, tres, pParentSql->res.code); } void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows) { - SSqlObj *pPObj = trsupport->pParentSqlObj; + SSqlObj *pParentSql = trsupport->pParentSqlObj; int32_t subqueryIndex = trsupport->subqueryIndex; assert(pSql != NULL); SSubqueryState* pState = trsupport->pState; - assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 && - pPObj->numOfSubs == pState->numOfTotal); + assert(pState->numOfRemain <= pState->numOfTotal && pState->numOfRemain >= 0 && pParentSql->numOfSubs == pState->numOfTotal); // retrieved in subquery failed. OR query cancelled in retrieve phase. - if (pState->code == TSDB_CODE_SUCCESS && pPObj->res.code != TSDB_CODE_SUCCESS) { - pState->code = pPObj->res.code; - + if (taos_errno(pSql) == TSDB_CODE_SUCCESS && pParentSql->res.code != TSDB_CODE_SUCCESS) { + /* * kill current sub-query connection, which may retrieve data from vnodes; * Here we get: pPObj->res.code == TSDB_CODE_TSC_QUERY_CANCELLED */ pSql->res.numOfRows = 0; trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; // disable retry efforts - tscTrace("%p query is cancelled, sub:%p, orderOfSub:%d abort retrieve, code:%d", trsupport->pParentSqlObj, pSql, - subqueryIndex, pState->code); + tscTrace("%p query is cancelled, sub:%p, orderOfSub:%d abort retrieve, code:%d", pParentSql, pSql, + subqueryIndex, pParentSql->res.code); } if (numOfRows >= 0) { // current query is successful, but other sub query failed, still abort current query. - tscTrace("%p sub:%p retrieve numOfRows:%d,orderOfSub:%d", pPObj, pSql, numOfRows, subqueryIndex); - tscError("%p sub:%p abort further retrieval due to other queries failure,orderOfSub:%d,code:%d", pPObj, pSql, - subqueryIndex, pState->code); + tscTrace("%p sub:%p retrieve numOfRows:%d,orderOfSub:%d", pParentSql, pSql, numOfRows, subqueryIndex); + tscError("%p sub:%p abort further retrieval due to other queries failure,orderOfSub:%d,code:%d", pParentSql, pSql, + subqueryIndex, pParentSql->res.code); } else { - if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && pState->code == TSDB_CODE_SUCCESS) { + if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && pParentSql->res.code == TSDB_CODE_SUCCESS) { /* * current query failed, and the retry count is less than the available * count, retry query clear previous retrieved data, then launch a new sub query @@ -1503,8 +1546,8 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO if (pNew == NULL) { tscError("%p sub:%p failed to create new subquery sqlObj due to out of memory, abort retry", trsupport->pParentSqlObj, pSql); - - pState->code = TSDB_CODE_TSC_OUT_OF_MEMORY; + + pParentSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY; trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; return; } @@ -1512,24 +1555,24 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO tscProcessSql(pNew); return; } else { // reach the maximum retry count, abort - atomic_val_compare_exchange_32(&pState->code, TSDB_CODE_SUCCESS, numOfRows); - tscError("%p sub:%p retrieve failed,code:%s,orderOfSub:%d failed.no more retry,set global code:%s", pPObj, pSql, - tstrerror(numOfRows), subqueryIndex, tstrerror(pState->code)); + atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, numOfRows); + tscError("%p sub:%p retrieve failed,code:%s,orderOfSub:%d failed.no more retry,set global code:%s", pParentSql, pSql, + tstrerror(numOfRows), subqueryIndex, tstrerror(pParentSql->res.code)); } } - - int32_t numOfTotal = pState->numOfTotal; - - int32_t finished = atomic_add_fetch_32(&pState->numOfCompleted, 1); - if (finished < numOfTotal) { - tscTrace("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pPObj, pSql, trsupport->subqueryIndex, finished); + + int32_t remain = -1; + if ((remain = atomic_sub_fetch_32(&pState->numOfRemain, 1)) > 0) { + tscTrace("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pParentSql, pSql, trsupport->subqueryIndex, + pState->numOfTotal - remain); + return tscFreeSubSqlObj(trsupport, pSql); } // all subqueries are failed - tscError("%p retrieve from %d vnode(s) completed,code:%s.FAILED.", pPObj, pState->numOfTotal, tstrerror(pState->code)); - pPObj->res.code = pState->code; - + tscError("%p retrieve from %d vnode(s) completed,code:%s.FAILED.", pParentSql, pState->numOfTotal, + tstrerror(pParentSql->res.code)); + // release allocated resource tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, trsupport->pFinalColModel, pState->numOfTotal); @@ -1538,13 +1581,13 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO tscFreeSubSqlObj(trsupport, pSql); // in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0); + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, 0); if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) { - (*pPObj->fp)(pPObj->param, pPObj, pPObj->res.code); + (*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.code); } else { // regular super table query - if (pPObj->res.code != TSDB_CODE_SUCCESS) { - tscQueueAsyncRes(pPObj); + if (pParentSql->res.code != TSDB_CODE_SUCCESS) { + tscQueueAsyncRes(pParentSql); } } } @@ -1590,14 +1633,11 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p return tscAbortFurtherRetryRetrieval(trsupport, pSql, TSDB_CODE_TSC_NO_DISKSPACE); } - // keep this value local variable, since the pState variable may be released by other threads, if atomic_add opertion - // increases the finished value up to pState->numOfTotal value, which means all subqueries are completed. - // In this case, the comparsion between finished value and released pState->numOfTotal is not safe. - int32_t numOfTotal = pState->numOfTotal; - - int32_t finished = atomic_add_fetch_32(&pState->numOfCompleted, 1); - if (finished < numOfTotal) { - tscTrace("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pPObj, pSql, trsupport->subqueryIndex, finished); + int32_t remain = -1; + if ((remain = atomic_sub_fetch_32(&pState->numOfRemain, 1)) > 0) { + tscTrace("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pPObj, pSql, trsupport->subqueryIndex, + pState->numOfTotal - remain); + return tscFreeSubSqlObj(trsupport, pSql); } @@ -1632,10 +1672,10 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) { SRetrieveSupport *trsupport = (SRetrieveSupport *)param; - int32_t idx = trsupport->subqueryIndex; - SSqlObj * pPObj = trsupport->pParentSqlObj; tOrderDescriptor *pDesc = trsupport->pOrderDescriptor; - + int32_t idx = trsupport->subqueryIndex; + SSqlObj * pPObj = trsupport->pParentSqlObj; + SSqlObj *pSql = (SSqlObj *)tres; if (pSql == NULL) { // sql object has been released in error process, return immediately tscTrace("%p subquery has been released, idx:%d, abort", pPObj, idx); @@ -1643,13 +1683,12 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR } SSubqueryState* pState = trsupport->pState; - assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 && - pPObj->numOfSubs == pState->numOfTotal); + assert(pState->numOfRemain <= pState->numOfTotal && pState->numOfRemain >= 0 && pPObj->numOfSubs == pState->numOfTotal); // query process and cancel query process may execute at the same time pthread_mutex_lock(&trsupport->queryMutex); - if (numOfRows < 0 || pState->code < 0 || pPObj->res.code != TSDB_CODE_SUCCESS) { + if (numOfRows < 0 || pPObj->res.code != TSDB_CODE_SUCCESS) { return tscHandleSubqueryError(trsupport, pSql, numOfRows); } @@ -1738,18 +1777,16 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { SCMVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[0]; SSubqueryState* pState = trsupport->pState; - assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 && - pParentSql->numOfSubs == pState->numOfTotal); - - if (pParentSql->res.code != TSDB_CODE_SUCCESS || pState->code != TSDB_CODE_SUCCESS) { + assert(pState->numOfRemain <= pState->numOfTotal && pState->numOfRemain >= 0 && pParentSql->numOfSubs == pState->numOfTotal); + + // todo set error code + if (pParentSql->res.code != TSDB_CODE_SUCCESS) { // stable query is killed, abort further retry trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; if (pParentSql->res.code != TSDB_CODE_SUCCESS) { code = pParentSql->res.code; - } else { - code = pState->code; } tscTrace("%p query cancelled or failed, sub:%p, orderOfSub:%d abort, code:%s", pParentSql, pSql, @@ -1766,7 +1803,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { if (code != TSDB_CODE_SUCCESS) { if (trsupport->numOfRetry++ >= MAX_NUM_OF_SUBQUERY_RETRY) { tscTrace("%p sub:%p reach the max retry times, set global code:%s", pParentSql, pSql, tstrerror(code)); - atomic_val_compare_exchange_32(&pState->code, 0, code); + atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, code); } else { // does not reach the maximum retry time, go on tscTrace("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(code), trsupport->numOfRetry); @@ -1775,8 +1812,8 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { if (pNew == NULL) { tscError("%p sub:%p failed to create new subquery due to out of memory, abort retry, vgId:%d, orderOfSub:%d", trsupport->pParentSqlObj, pSql, pVgroup->vgId, trsupport->subqueryIndex); - - pState->code = TSDB_CODE_TSC_OUT_OF_MEMORY; + + pParentSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY; trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; } else { SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); @@ -1789,11 +1826,11 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { } } - if (pState->code != TSDB_CODE_SUCCESS) { // at least one peer subquery failed, abort current query + if (pParentSql->res.code != TSDB_CODE_SUCCESS) { // at least one peer subquery failed, abort current query tscTrace("%p sub:%p query failed,ip:%u,vgId:%d,orderOfSub:%d,global code:%d", pParentSql, pSql, - pVgroup->ipAddr[0].fqdn, pVgroup->vgId, trsupport->subqueryIndex, pState->code); + pVgroup->ipAddr[0].fqdn, pVgroup->vgId, trsupport->subqueryIndex, pParentSql->res.code); - tscHandleSubqueryError(param, tres, pState->code); + tscHandleSubqueryError(param, tres, pParentSql->res.code); } else { // success, proceed to retrieve data from dnode tscTrace("%p sub:%p query complete, ip:%s, vgId:%d, orderOfSub:%d, retrieve data", trsupport->pParentSqlObj, pSql, pVgroup->ipAddr[0].fqdn, pVgroup->vgId, trsupport->subqueryIndex); @@ -1813,8 +1850,7 @@ static void multiVnodeInsertMerge(void* param, TAOS_RES* tres, int numOfRows) { SSqlCmd* pParentCmd = &pParentObj->cmd; SSubqueryState* pState = pSupporter->pState; - int32_t total = pState->numOfTotal; - + // increase the total inserted rows if (numOfRows > 0) { pParentObj->res.numOfRows += numOfRows; @@ -1826,8 +1862,7 @@ static void multiVnodeInsertMerge(void* param, TAOS_RES* tres, int numOfRows) { } taos_free_result(tres); - int32_t completed = atomic_add_fetch_32(&pState->numOfCompleted, 1); - if (completed < total) { + if (atomic_sub_fetch_32(&pState->numOfRemain, 1) > 0) { return; } @@ -2157,5 +2192,3 @@ static bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) { return hasData; } - - diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index bb6eaa68e5..eb62d6c61a 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -1216,7 +1216,6 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl // interval query with limit applied int32_t numOfRes = 0; - if (isIntervalQuery(pQuery)) { numOfRes = doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo); } else { diff --git a/src/util/src/terror.c b/src/util/src/terror.c index c040a11362..aa6925464e 100644 --- a/src/util/src/terror.c +++ b/src/util/src/terror.c @@ -15,7 +15,6 @@ #include #include -#include #include #include -- GitLab