diff --git a/src/client/inc/tscSecondaryMerge.h b/src/client/inc/tscSecondaryMerge.h index 0c6472f6b367857edbdc92a08e0bc8a263572ee1..bcfe14fcb79e79b6d3965dfd5970421819f27b23 100644 --- a/src/client/inc/tscSecondaryMerge.h +++ b/src/client/inc/tscSecondaryMerge.h @@ -120,7 +120,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd void tscDestroyLocalReducer(SSqlObj *pSql); -int32_t tscLocalDoReduce(SSqlObj *pSql); +int32_t tscDoLocalreduce(SSqlObj *pSql); #ifdef __cplusplus } diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index fd0973f319166a9eba73bb89df607f7d121efa9e..aa05c782cc35b59ffcc82bbf0078d1116db392b6 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -187,7 +187,7 @@ SMeterMetaInfo* tscGetMeterMetaInfoFromQueryInfo(SQueryInfo *pQueryInfo, int32_t SQueryInfo *tscGetQueryInfoDetail(SSqlCmd* pCmd, int32_t subClauseIndex); int32_t tscGetQueryInfoDetailSafely(SSqlCmd *pCmd, int32_t subClauseIndex, SQueryInfo** pQueryInfo); -SMeterMetaInfo* tscGetMeterMetaInfoByUid(SQueryInfo* pQueryInfo, int32_t subClauseIndex, uint64_t uid, int32_t* index); +SMeterMetaInfo* tscGetMeterMetaInfoByUid(SQueryInfo* pQueryInfo, uint64_t uid, int32_t* index); void tscClearMeterMetaInfo(SMeterMetaInfo* pMeterMetaInfo, bool removeFromCache); SMeterMetaInfo* tscAddMeterMetaInfo(SQueryInfo* pQueryInfo, const char* name, SMeterMeta* pMeterMeta, SMetricMeta* pMetricMeta, @@ -197,7 +197,7 @@ int32_t tscAddSubqueryInfo(SSqlCmd *pCmd); void tscFreeSubqueryInfo(SSqlCmd* pCmd); void tscClearSubqueryInfo(SSqlCmd* pCmd); -void tscGetMetricMetaCacheKey(SSqlCmd* pCmd, int32_t subClauseIndex, char* keyStr, uint64_t uid); +void tscGetMetricMetaCacheKey(SQueryInfo* pQueryInfo, char* keyStr, uint64_t uid); int tscGetMetricMeta(SSqlObj* pSql, int32_t clauseIndex); int tscGetMeterMeta(SSqlObj* pSql, SMeterMetaInfo* pMeterMetaInfo); int tscGetMeterMetaEx(SSqlObj* pSql, SMeterMetaInfo* pMeterMetaInfo, bool createIfNotExists); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 773924c1105f6fd70b9c86bb03e35f5b29aa9eaf..823fd359cc7af8aa2cd899eeea7e6c1d25651eae 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -273,8 +273,10 @@ struct STSBuf; typedef struct { uint8_t code; - int numOfRows; // num of results in current retrieved - int numOfTotal; // num of total results + int64_t numOfRows; // num of results in current retrieved + int64_t numOfTotal; // num of total results + int64_t numOfTotalInCurrentClause; // num of total result in current subclause + char * pRsp; int rspType; int rspLen; @@ -431,7 +433,7 @@ void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql); void tscKillMetricQuery(SSqlObj *pSql); void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen); bool tscIsUpdateQuery(STscObj *pObj); -bool tscHasReachLimitation(SSqlObj *pSql); +bool tscHasReachLimitation(SQueryInfo* pQueryInfo, SSqlRes* pRes); char *tscGetErrorMsgPayload(SSqlCmd *pCmd); diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 8b0edaf9fe735b5d25307967b2f7c06905a54c1b..0e22214147af8c7595c08fe86b1e3db2828a7740 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -118,7 +118,7 @@ static void tscProcessAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOf SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); // sequentially retrieve data from remain vnodes first, query vnode specified by vnodeIdx if (numOfRows == 0 && tscProjectionQueryOnSTable(pQueryInfo, 0)) { @@ -285,7 +285,7 @@ void tscProcessAsyncRetrieve(void *param, TAOS_RES *tres, int numOfRows) { } /* update the limit value according to current retrieval results */ - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); pQueryInfo->limit.limit = pCmd->globalLimit - pRes->numOfTotal; if ((++pMeterMetaInfo->vnodeIndex) <= pMeterMetaInfo->pMetricMeta->numOfVnodes) { @@ -312,9 +312,8 @@ void tscProcessFetchRow(SSchedMsg *pMsg) { SSqlObj *pSql = (SSqlObj *)pMsg->ahandle; SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; - assert(0); - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); assert(pCmd->numOfCols == pQueryInfo->fieldsInfo.numOfOutputCols); for (int i = 0; i < pCmd->numOfCols; ++i) @@ -497,7 +496,7 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) { if (pSql->pStream == NULL) { // check if it is a sub-query of metric query first, if true, enter another routine - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); if ((pQueryInfo->type & TSDB_QUERY_TYPE_STABLE_SUBQUERY) == TSDB_QUERY_TYPE_STABLE_SUBQUERY) { SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); @@ -552,7 +551,7 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) { * transfer the sql function for metric query before get meter/metric meta, * since in callback functions, only tscProcessSql(pStream->pSql) is executed! */ - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); tscTansformSQLFunctionForSTableQuery(pQueryInfo); tscIncStreamExecutionCount(pSql->pStream); diff --git a/src/client/src/tscJoinProcess.c b/src/client/src/tscJoinProcess.c index bd9a782d8cd052f53b57b85a75ceb3bcf3bf44b8..3000be213a335b8683e6e7a31cea3579df5bad28 100644 --- a/src/client/src/tscJoinProcess.c +++ b/src/client/src/tscJoinProcess.c @@ -441,7 +441,10 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { taos_fetch_rows_a(tres, joinRetrieveCallback, param); } else if (numOfRows == 0) { // no data from this vnode anymore - if (tscProjectionQueryOnSTable(pQueryInfo, 0)) { + SQueryInfo* pParentQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, pParentSql->cmd.clauseIndex); + + //todo refactor + if (tscProjectionQueryOnSTable(pParentQueryInfo, 0)) { SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); assert(pQueryInfo->numOfTables == 1); @@ -548,11 +551,11 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { if (tscProjectionQueryOnSTable(pQueryInfo, 0)) { if (pRes->row >= pRes->numOfRows && pMeterMetaInfo->vnodeIndex < pMeterMetaInfo->pMetricMeta->numOfVnodes && - (!tscHasReachLimitation(pSql->pSubs[i]))) { + (!tscHasReachLimitation(pQueryInfo, pRes))) { numOfFetch++; } } else { - if (pRes->row >= pRes->numOfRows && (!tscHasReachLimitation(pSql->pSubs[i]))) { + if (pRes->row >= pRes->numOfRows && (!tscHasReachLimitation(pQueryInfo, pRes))) { numOfFetch++; } } diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c index 744ffd5b2bdc6dbf1c9db900b90cc97525d70d03..4ab63c18e9ad339664b1771bead46aedcb0c9d48 100644 --- a/src/client/src/tscPrepare.c +++ b/src/client/src/tscPrepare.c @@ -448,6 +448,8 @@ static int insertStmtExecute(STscStmt* stmt) { SSqlRes *pRes = &pSql->res; pRes->numOfRows = 0; pRes->numOfTotal = 0; + pRes->numOfTotalInCurrentClause = 0; + pRes->qhandle = 0; pSql->thandle = NULL; diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 49d2678ea5deab6d8fe1a0b043bf6205d7b33678..7b92ad5fcf6345e459a96384cb073e64e8666737 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -520,7 +520,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { } } - // set the command/globallimit parameters from the first subclause to the sqlcmd object + // set the command/global limit parameters from the first subclause to the sqlcmd object SQueryInfo* pQueryInfo1 = tscGetQueryInfoDetail(pCmd, 0); pCmd->command = pQueryInfo1->command; @@ -5576,10 +5576,6 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { } } - // handle the limit offset value, validate the limit - pQueryInfo->limit = pQuerySql->limit; - - // temporarily save the original limitation value if ((code = parseLimitClause(pQueryInfo, index, pQuerySql, pSql)) != TSDB_CODE_SUCCESS) { return code; } diff --git a/src/client/src/tscSecondaryMerge.c b/src/client/src/tscSecondaryMerge.c index e9a63b028b957e5604ad4c22ecdf636aa0c5d4ef..e7a4941fc179e43019219b6dc2db22cf68e7cacf 100644 --- a/src/client/src/tscSecondaryMerge.c +++ b/src/client/src/tscSecondaryMerge.c @@ -58,7 +58,7 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SSqlRes *pRes, SLocalReducer *pRedu * the fields and offset attributes in pCmd and pModel may be different due to * merge requirement. So, the final result in pRes structure is formatted in accordance with the pCmd object. */ - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { SQLFunctionCtx *pCtx = &pReducer->pCtx[i]; @@ -215,7 +215,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd #ifdef _DEBUG_VIEW printf("load data page into mem for build loser tree: %ld rows\n", pDS->filePage.numOfElems); SSrcColumnInfo colInfo[256] = {0}; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); tscGetSrcColumnInfo(colInfo, pQueryInfo); @@ -241,7 +241,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd param->pLocalData = pReducer->pLocalDataSrc; param->pDesc = pReducer->pDesc; param->numOfElems = pReducer->pLocalDataSrc[0]->pMemBuffer->numOfElemsPerPage; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); param->groupOrderType = pQueryInfo->groupbyExpr.orderType; @@ -434,7 +434,7 @@ void tscDestroyLocalReducer(SSqlObj *pSql) { } SSqlCmd *pCmd = &pSql->cmd; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); // there is no more result, so we release all allocated resource SLocalReducer *pLocalReducer = (SLocalReducer*)atomic_exchange_ptr(&pRes->pLocalReducer, NULL); @@ -500,7 +500,7 @@ void tscDestroyLocalReducer(SSqlObj *pSql) { static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCmd, tColModel *pModel) { int32_t numOfGroupByCols = 0; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); if (pQueryInfo->groupbyExpr.numOfGroupCols > 0) { numOfGroupByCols = pQueryInfo->groupbyExpr.numOfGroupCols; @@ -541,7 +541,7 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCm } bool isSameGroup(SSqlCmd *pCmd, SLocalReducer *pReducer, char *pPrev, tFilePage *tmpBuffer) { - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); int16_t functionId = tscSqlExprGet(pQueryInfo, 0)->functionId; @@ -787,7 +787,7 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo SSqlCmd * pCmd = &pSql->cmd; SSqlRes * pRes = &pSql->res; tFilePage *pFinalDataPage = pLocalReducer->pResultBuf; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); if (pRes->pLocalReducer != pLocalReducer) { /* @@ -802,7 +802,7 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo // no interval query, no interpolation pRes->data = pLocalReducer->pFinalRes; pRes->numOfRows = pFinalDataPage->numOfElems; - pRes->numOfTotal += pRes->numOfRows; + pRes->numOfTotalInCurrentClause += pRes->numOfRows; if (pQueryInfo->limit.offset > 0) { if (pQueryInfo->limit.offset < pRes->numOfRows) { @@ -813,23 +813,23 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo tColModelCompact(pLocalReducer->resColModel, pFinalDataPage, prevSize); pRes->numOfRows -= pQueryInfo->limit.offset; - pRes->numOfTotal -= pQueryInfo->limit.offset; + pRes->numOfTotalInCurrentClause -= pQueryInfo->limit.offset; pQueryInfo->limit.offset = 0; } else { pQueryInfo->limit.offset -= pRes->numOfRows; pRes->numOfRows = 0; - pRes->numOfTotal = 0; + pRes->numOfTotalInCurrentClause = 0; } } - if (pQueryInfo->limit.limit >= 0 && pRes->numOfTotal > pQueryInfo->limit.limit) { + if (pQueryInfo->limit.limit >= 0 && pRes->numOfTotalInCurrentClause > pQueryInfo->limit.limit) { /* impose the limitation of output rows on the final result */ int32_t prevSize = pFinalDataPage->numOfElems; - int32_t overFlow = pRes->numOfTotal - pQueryInfo->limit.limit; + int32_t overFlow = pRes->numOfTotalInCurrentClause - pQueryInfo->limit.limit; assert(overFlow < pRes->numOfRows); - pRes->numOfTotal = pQueryInfo->limit.limit; + pRes->numOfTotalInCurrentClause = pQueryInfo->limit.limit; pRes->numOfRows -= overFlow; pFinalDataPage->numOfElems -= overFlow; @@ -898,7 +898,7 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo pRes->data = pLocalReducer->pFinalRes; pRes->numOfRows = newRows; - pRes->numOfTotal += newRows; + pRes->numOfTotalInCurrentClause += newRows; pQueryInfo->limit.offset = 0; break; @@ -924,13 +924,13 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo } if (pRes->numOfRows > 0) { - if (pQueryInfo->limit.limit >= 0 && pRes->numOfTotal > pQueryInfo->limit.limit) { - int32_t overFlow = pRes->numOfTotal - pQueryInfo->limit.limit; + if (pQueryInfo->limit.limit >= 0 && pRes->numOfTotalInCurrentClause > pQueryInfo->limit.limit) { + int32_t overFlow = pRes->numOfTotalInCurrentClause - pQueryInfo->limit.limit; pRes->numOfRows -= overFlow; assert(pRes->numOfRows >= 0); - pRes->numOfTotal = pQueryInfo->limit.limit; + pRes->numOfTotalInCurrentClause = pQueryInfo->limit.limit; pFinalDataPage->numOfElems -= overFlow; /* set remain data to be discarded, and reset the interpolation information */ @@ -974,7 +974,7 @@ static void savePreviousRow(SLocalReducer *pLocalReducer, tFilePage *tmpBuffer) static void doExecuteSecondaryMerge(SSqlCmd* pCmd, SLocalReducer *pLocalReducer, bool needInit) { // the tag columns need to be set before all functions execution - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); for(int32_t j = 0; j < pQueryInfo->fieldsInfo.numOfOutputCols; ++j) { SSqlExpr * pExpr = tscSqlExprGet(pQueryInfo, j); @@ -1129,7 +1129,7 @@ static bool saveGroupResultInfo(SSqlObj *pSql) { SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); pRes->numOfGroups += 1; // the output group is limited by the slimit clause @@ -1139,7 +1139,7 @@ static bool saveGroupResultInfo(SSqlObj *pSql) { // pRes->pGroupRec = realloc(pRes->pGroupRec, pRes->numOfGroups*sizeof(SResRec)); // pRes->pGroupRec[pRes->numOfGroups-1].numOfRows = pRes->numOfRows; - // pRes->pGroupRec[pRes->numOfGroups-1].numOfTotal = pRes->numOfTotal; + // pRes->pGroupRec[pRes->numOfGroups-1].numOfTotalInCurrentClause = pRes->numOfTotalInCurrentClause; return false; } @@ -1155,7 +1155,7 @@ bool doGenerateFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool no SSqlCmd * pCmd = &pSql->cmd; SSqlRes * pRes = &pSql->res; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); tFilePage *pResBuf = pLocalReducer->pResultBuf; tColModel *pModel = pLocalReducer->resColModel; @@ -1207,8 +1207,9 @@ void resetOutputBuf(SQueryInfo* pQueryInfo, SLocalReducer *pLocalReducer) { // static void resetEnvForNewResultset(SSqlRes *pRes, SSqlCmd *pCmd, SLocalReducer *pLocalReducer) { // In handling data in other groups, we need to reset the interpolation information for a new group data pRes->numOfRows = 0; - pRes->numOfTotal = 0; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + pRes->numOfTotalInCurrentClause = 0; + + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); pQueryInfo->limit.offset = pLocalReducer->offset; @@ -1233,7 +1234,7 @@ static bool doInterpolationForCurrentGroup(SSqlObj *pSql) { SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SLocalReducer * pLocalReducer = pRes->pLocalReducer; SInterpolationInfo *pInterpoInfo = &pLocalReducer->interpolationInfo; @@ -1269,7 +1270,7 @@ static bool doHandleLastRemainData(SSqlObj *pSql) { bool prevGroupCompleted = (!pLocalReducer->discard) && pLocalReducer->hasUnprocessedRow; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); int8_t precision = pMeterMetaInfo->pMeterMeta->precision; @@ -1313,7 +1314,7 @@ static void doMergeWithPrevRows(SSqlObj *pSql, int32_t numOfRes) { SSqlRes * pRes = &pSql->res; SLocalReducer *pLocalReducer = pRes->pLocalReducer; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); for (int32_t k = 0; k < pQueryInfo->fieldsInfo.numOfOutputCols; ++k) { SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, k); @@ -1330,26 +1331,21 @@ static void doMergeWithPrevRows(SSqlObj *pSql, int32_t numOfRes) { doExecuteSecondaryMerge(pCmd, pLocalReducer, true); } -int32_t tscLocalDoReduce(SSqlObj *pSql) { +int32_t tscDoLocalreduce(SSqlObj *pSql) { SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; - + + tscResetForNextRetrieve(pRes); + if (pSql->signature != pSql || pRes == NULL || pRes->pLocalReducer == NULL) { // all data has been processed tscTrace("%s call the drop local reducer", __FUNCTION__); tscDestroyLocalReducer(pSql); - if (pRes) { - pRes->numOfRows = 0; - pRes->row = 0; - } return 0; } - - pRes->row = 0; - pRes->numOfRows = 0; - + SLocalReducer *pLocalReducer = pRes->pLocalReducer; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); // set the data merge in progress int32_t prevStatus = @@ -1397,7 +1393,7 @@ int32_t tscLocalDoReduce(SSqlObj *pSql) { #if defined(_DEBUG_VIEW) printf("chosen row:\t"); SSrcColumnInfo colInfo[256] = {0}; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); tscGetSrcColumnInfo(colInfo, pQueryInfo); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index ee66599f0f5c4b520fa93dddcee38751172959f1..a125fd1ed6407f9d22507bdf6ac424f0ca299287 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -652,18 +652,18 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) { } static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj); -static int tscLaunchMetricSubQueries(SSqlObj *pSql); +static int tscLaunchSTableSubqueries(SSqlObj *pSql); // todo merge with callback int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySupporter *pSupporter) { SSqlCmd * pCmd = &pSql->cmd; - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); pSql->res.qhandle = 0x1; pSql->res.numOfRows = 0; if (pSql->pSubs == NULL) { - pSql->pSubs = malloc(POINTER_BYTES * pSupporter->pState->numOfTotal); + pSql->pSubs = calloc(pSupporter->pState->numOfTotal, POINTER_BYTES); if (pSql->pSubs == NULL) { return TSDB_CODE_CLI_OUT_OF_MEMORY; } @@ -874,7 +874,7 @@ int tscProcessSql(SSqlObj *pSql) { if (tscIsTwoStageMergeMetricQuery(pQueryInfo, 0)) { /* * (ref. line: 964) - * Before this function returns from tscLaunchMetricSubQueries and continues, pSql may have been released at user + * Before this function returns from tscLaunchSTableSubqueries and continues, pSql may have been released at user * program context after retrieving all data from vnodes. User function is called at tscRetrieveFromVnodeCallBack. * * when pSql being released, pSql->fp == NULL, it may pass the check of pSql->fp == NULL, @@ -882,7 +882,7 @@ int tscProcessSql(SSqlObj *pSql) { */ void *fp = pSql->fp; - if (tscLaunchMetricSubQueries(pSql) != TSDB_CODE_SUCCESS) { + if (tscLaunchSTableSubqueries(pSql) != TSDB_CODE_SUCCESS) { return pRes->code; } @@ -923,7 +923,7 @@ static void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs, SSubqueryState free(pState); } -int tscLaunchMetricSubQueries(SSqlObj *pSql) { +int tscLaunchSTableSubqueries(SSqlObj *pSql) { SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; @@ -1217,7 +1217,7 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) { #ifdef _DEBUG_VIEW printf("received data from vnode: %d rows\n", pRes->numOfRows); SSrcColumnInfo colInfo[256] = {0}; - SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); tscGetSrcColumnInfo(colInfo, pQueryInfo); tColModelDisplayEx(pDesc->pSchema, pRes->data, pRes->numOfRows, pRes->numOfRows, colInfo); @@ -2575,8 +2575,8 @@ int tscProcessRetrieveMetricRsp(SSqlObj *pSql) { SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; - pRes->code = tscLocalDoReduce(pSql); - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + pRes->code = tscDoLocalreduce(pSql); + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) { tscSetResultPointer(pQueryInfo, pRes); @@ -3223,7 +3223,7 @@ int tscProcessMetricMetaRsp(SSqlObj *pSql) { char name[TSDB_MAX_TAGS_LEN + 1] = {0}; SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i); - tscGetMetricMetaCacheKey(&pSql->cmd, 0, name, pMeterMetaInfo->pMeterMeta->uid); + tscGetMetricMetaCacheKey(pQueryInfo, name, pMeterMetaInfo->pMeterMeta->uid); #ifdef _DEBUG_VIEW printf("generate the metric key:%s, index:%d\n", name, i); @@ -3646,7 +3646,7 @@ int tscGetMetricMeta(SSqlObj *pSql, int32_t clauseIndex) { char tagstr[TSDB_MAX_TAGS_LEN + 1] = {0}; SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i); - tscGetMetricMetaCacheKey(pCmd, clauseIndex, tagstr, pMeterMetaInfo->pMeterMeta->uid); + tscGetMetricMetaCacheKey(pQueryInfo, tagstr, pMeterMetaInfo->pMeterMeta->uid); taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), false); @@ -3712,7 +3712,7 @@ int tscGetMetricMeta(SSqlObj *pSql, int32_t clauseIndex) { char tagstr[TSDB_MAX_TAGS_LEN] = {0}; SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i); - tscGetMetricMetaCacheKey(pCmd, 0, tagstr, pMeterMetaInfo->pMeterMeta->uid); + tscGetMetricMetaCacheKey(pQueryInfo, tagstr, pMeterMetaInfo->pMeterMeta->uid); #ifdef _DEBUG_VIEW printf("create metric key:%s, index:%d\n", tagstr, i); diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 2cfe0f949fed63ffaa3f5cf2aba5754d5bdbcef1..0ce3d39d756b5a50a294fdddb56b076380535c3a 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -200,6 +200,8 @@ int taos_query_imp(STscObj *pObj, SSqlObj *pSql) { pRes->numOfRows = 1; pRes->numOfTotal = 0; + pRes->numOfTotalInCurrentClause = 0; + pSql->asyncTblPos = NULL; if (NULL != pSql->pTableHashList) { taosCleanUpHashTable(pSql->pTableHashList); @@ -367,7 +369,7 @@ int taos_fetch_block_impl(TAOS_RES *res, TAOS_ROW *rows) { // secondary merge has handle this situation if (pCmd->command != TSDB_SQL_RETRIEVE_METRIC) { - pRes->numOfTotal += pRes->numOfRows; + pRes->numOfTotalInCurrentClause += pRes->numOfRows; } SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); @@ -457,7 +459,7 @@ static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) { * available, go on */ if (pMetaInfo->vnodeIndex < pMetaInfo->pMetricMeta->numOfVnodes && pRes1->row < pRes1->numOfRows && - (!tscHasReachLimitation(pSql->pSubs[i]))) { + (!tscHasReachLimitation(pQueryInfo1, pRes1))) { allSubqueryExhausted = false; break; } @@ -469,7 +471,7 @@ static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) { SSqlRes * pRes1 = &pSql->pSubs[i]->res; SQueryInfo *pQueryInfo1 = tscGetQueryInfoDetail(&pSql->pSubs[i]->cmd, 0); - if ((pRes1->row >= pRes1->numOfRows && tscHasReachLimitation(pSql->pSubs[i]) && + if ((pRes1->row >= pRes1->numOfRows && tscHasReachLimitation(pQueryInfo1, pRes1) && tscProjectionQueryOnTable(pQueryInfo1)) || (pRes1->numOfRows == 0)) { hasData = false; @@ -552,6 +554,94 @@ static void **tscJoinResultsetFromBuf(SSqlObj *pSql) { return pRes->tsrow; } +/** + * If current vnode query does not return results anymore (pRes->numOfRows == 0), try the next vnode if exists, + * in case of multi-vnode super table projection query and the result does not reach the limitation. + */ +static bool hasMoreVnodesToTry(SSqlObj *pSql) { + SSqlCmd *pCmd = &pSql->cmd; + SSqlRes *pRes = &pSql->res; + + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + + return pRes->numOfRows == 0 && tscProjectionQueryOnSTable(pQueryInfo, 0) && !tscHasReachLimitation(pQueryInfo, pRes); +} + +static void tscTryQueryNextVnode(SSqlObj *pSql) { + SSqlCmd *pCmd = &pSql->cmd; + SSqlRes *pRes = &pSql->res; + + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + + /* + * no result returned from the current virtual node anymore, try the next vnode if exists + * if case of: multi-vnode super table projection query + */ + assert(pRes->numOfRows == 0 && tscProjectionQueryOnSTable(pQueryInfo, 0) && !tscHasReachLimitation(pQueryInfo, pRes)); + + SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); + int32_t totalVnode = pMeterMetaInfo->pMetricMeta->numOfVnodes; + + while (++pMeterMetaInfo->vnodeIndex < totalVnode) { + tscTrace("%p current vnode:%d exhausted, try next:%d. total vnode:%d. current numOfRes:%d", pSql, + pMeterMetaInfo->vnodeIndex - 1, pMeterMetaInfo->vnodeIndex, totalVnode, pRes->numOfTotalInCurrentClause); + + /* + * update the limit and offset value for the query on the next vnode, + * according to current retrieval results + * + * NOTE: + * if the pRes->offset is larger than 0, the start returned position has not reached yet. + * Therefore, the pRes->numOfRows, as well as pRes->numOfTotalInCurrentClause, must be 0. + * The pRes->offset value will be updated by virtual node, during query execution. + */ + if (pQueryInfo->clauseLimit >= 0) { + pQueryInfo->limit.limit = pQueryInfo->clauseLimit - pRes->numOfTotalInCurrentClause; + } + + pQueryInfo->limit.offset = pRes->offset; + + assert((pRes->offset >= 0 && pRes->numOfRows == 0) || (pRes->offset == 0 && pRes->numOfRows >= 0)); + tscTrace("%p new query to next vnode, vnode index:%d, limit:%" PRId64 ", offset:%" PRId64 ", glimit:%" PRId64, pSql, + pMeterMetaInfo->vnodeIndex, pQueryInfo->limit.limit, pQueryInfo->limit.offset, pQueryInfo->clauseLimit); + + /* + * For project query with super table join, the numOfSub is equalled to the number of all subqueries. + * Therefore, we need to reset the value of numOfSubs to be 0. + * + * For super table join with projection query, if anyone of the subquery is exhausted, the query completed. + */ + pSql->numOfSubs = 0; + + pCmd->command = TSDB_SQL_SELECT; + assert(pSql->fp == NULL); + + int32_t ret = tscProcessSql(pSql); // todo check for failure + if (ret != TSDB_CODE_SUCCESS) { + pSql->res.code = ret; + return; + } + + // retrieve data + assert(pCmd->command == TSDB_SQL_SELECT); + pCmd->command = TSDB_SQL_FETCH; + + if ((ret = tscProcessSql(pSql)) != TSDB_CODE_SUCCESS) { + pSql->res.code = ret; + return; + } + + // if the result from current virtual node are empty, try next if exists. otherwise, return the results. + if (pRes->numOfRows > 0) { + break; + } + } + + if (pRes->numOfRows == 0) { + tscTrace("%p all vnodes exhausted, prj query completed. total res:%d", pSql, totalVnode, pRes->numOfTotal); + } +} + TAOS_ROW taos_fetch_row_impl(TAOS_RES *res) { SSqlObj *pSql = (SSqlObj *)res; SSqlCmd *pCmd = &pSql->cmd; @@ -572,7 +662,13 @@ TAOS_ROW taos_fetch_row_impl(TAOS_RES *res) { return NULL; } - } else if (pRes->row >= pRes->numOfRows) { // not a join query + } else if (pRes->row >= pRes->numOfRows) { + /** + * NOT a join query + * + * If the data block of current result set have been consumed already, try fetch next result + * data block from virtual node. + */ tscResetForNextRetrieve(pRes); if (pCmd->command < TSDB_SQL_LOCAL) { @@ -580,89 +676,17 @@ TAOS_ROW taos_fetch_row_impl(TAOS_RES *res) { } tscProcessSql(pSql); // retrieve data from virtual node - - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - - /* - * no result returned from the current virtual node anymore, try the next vnode if exists - * if case of: multi-vnode super table projection query - */ - if (pRes->numOfRows == 0 && tscProjectionQueryOnSTable(pQueryInfo, 0)) { - SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); - int32_t totalVnode = pMeterMetaInfo->pMetricMeta->numOfVnodes; - - while (++pMeterMetaInfo->vnodeIndex < totalVnode) { - tscTrace("%p current vnode:%d exhausted, try next:%d. total vnode:%d. current numOfRes:%d", pSql, - pMeterMetaInfo->vnodeIndex - 1, pMeterMetaInfo->vnodeIndex, totalVnode, pRes->numOfTotal); - - // reach the maximum number of output rows, abort - if (tscHasReachLimitation(pSql)) { - return NULL; - } - - /* - * update the limit and offset value for the query on the next vnode, - * according to current retrieval results - * - * NOTE: - * if the pRes->offset is larger than 0, the start returned position has not reached yet. - * Therefore, the pRes->numOfRows, as well as pRes->numOfTotal, must be 0. - * The pRes->offset value will be updated by virtual node, during query execution. - */ - if (pQueryInfo->clauseLimit >= 0) { - pQueryInfo->limit.limit = pQueryInfo->clauseLimit - pRes->numOfTotal; - } - - pQueryInfo->limit.offset = pRes->offset; - - assert((pRes->offset >= 0 && pRes->numOfRows == 0) || (pRes->offset == 0 && pRes->numOfRows >= 0)); - tscTrace("%p new query to next vnode, vnode index:%d, limit:%" PRId64 ", offset:%" PRId64 ", glimit:%" PRId64, - pSql, pMeterMetaInfo->vnodeIndex, pQueryInfo->limit.limit, pQueryInfo->limit.offset, - pQueryInfo->clauseLimit); - - /* - * For project query with super table join, the numOfSub is equalled to the number of all subqueries. - * Therefore, we need to reset the value of numOfSubs to be 0. - * - * For super table join with projection query, if anyone of the subquery is exhausted, the query completed. - */ - pSql->numOfSubs = 0; - - pCmd->command = TSDB_SQL_SELECT; - assert(pSql->fp == NULL); - - int32_t ret = tscProcessSql(pSql); // todo check for failure - if (ret != TSDB_CODE_SUCCESS) { - pSql->res.code = ret; - return NULL; - } - - // retrieve data - assert(pCmd->command == TSDB_SQL_SELECT); - pCmd->command = TSDB_SQL_FETCH; - - if ((ret = tscProcessSql(pSql)) != TSDB_CODE_SUCCESS) { - pSql->res.code = ret; - return NULL; - } - - // if the result from current virtual node are empty, try next if exists. otherwise, return the results. - if (pRes->numOfRows > 0) { - break; - } - } - if (pRes->numOfRows == 0) { - tscTrace("%p all vnodes exhausted, prj query completed. total res:%d", pSql, totalVnode, pRes->numOfTotal); - } + if (hasMoreVnodesToTry(pSql)) { + tscTryQueryNextVnode(pSql); } /* * local reducer has handle this case, - * so no need to add the pRes->numOfRows for metric retrieve + * so no need to add the pRes->numOfRows for super table query */ if (pCmd->command != TSDB_SQL_RETRIEVE_METRIC) { - pRes->numOfTotal += pRes->numOfRows; + pRes->numOfTotalInCurrentClause += pRes->numOfRows; } if (pRes->numOfRows == 0) { @@ -675,73 +699,43 @@ TAOS_ROW taos_fetch_row_impl(TAOS_RES *res) { TAOS_ROW taos_fetch_row(TAOS_RES *res) { SSqlObj *pSql = (SSqlObj *)res; - + SSqlCmd *pCmd = &pSql->cmd; + SSqlRes *pRes = &pSql->res; + if (pSql == NULL || pSql->signature != pSql) { globalCode = TSDB_CODE_DISCONNECTED; return NULL; } - // projection query on metric, pipeline retrieve data from vnode list, instead of two-stage merge + /* + * projection query on super table, access each virtual node sequentially retrieve data from vnode list, + * instead of two-stage merge + */ TAOS_ROW rows = taos_fetch_row_impl(res); + + pRes->numOfTotal += pRes->numOfTotalInCurrentClause; + pRes->numOfTotalInCurrentClause = 0; + + // current subclause is completed, try the next subclause + while (rows == NULL && pCmd->clauseIndex < pCmd->numOfClause - 1) { + pSql->cmd.command = TSDB_SQL_SELECT; + pCmd->clauseIndex++; + + assert(pSql->fp == NULL); + + tscTrace("%p try data in the next subclause:%d, total subclause:%d", pSql, pCmd->clauseIndex, pCmd->numOfClause); + tscProcessSql(pSql); - // SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - // while (rows == NULL && tscProjectionQueryOnSTable(pQueryInfo, 0)) { - // SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); - // - // // reach the maximum number of output rows, abort - // if (tscHasReachLimitation(pSql)) { - // return NULL; - // } - // - // /* - // * update the limit and offset value according to current retrieval results - // * Note: if pRes->offset > 0, pRes->numOfRows = 0, pRes->numOfTotal = 0; - // */ - // pQueryInfo->limit.limit = pCmd->globalLimit - pRes->numOfTotal; - // pQueryInfo->limit.offset = pRes->offset; - // - // assert((pRes->offset >= 0 && pRes->numOfRows == 0) || (pRes->offset == 0 && pRes->numOfRows >= 0)); - // - // /* - // * For project query with super table join, the numOfSub is equalled to the number of all subqueries, so - // * we need to reset the value of numOfSubs to be 0. - // * - // * For super table join with projection query, if anyone of the subquery is exhausted, the query completed. - // */ - // pSql->numOfSubs = 0; - // - // if ((++pMeterMetaInfo->vnodeIndex) < pMeterMetaInfo->pMetricMeta->numOfVnodes) { - // pCmd->command = TSDB_SQL_SELECT; - // assert(pSql->fp == NULL); - // tscProcessSql(pSql); - // rows = taos_fetch_row_impl(res); - // } - // - // // check!!! - // if (rows != NULL || pMeterMetaInfo->vnodeIndex >= pMeterMetaInfo->pMetricMeta->numOfVnodes) { - // break; - // } - // } - // - // // current subclause is completed, try the next subclause - // if (rows == NULL && pCmd->clauseIndex < pCmd->numOfClause - 1) { - // pSql->cmd.command = TSDB_SQL_SELECT; - // pCmd->clauseIndex++; - // - // assert(pSql->fp == NULL); - // - // tscTrace("%p start next subclause:%d, total subclause:%d", pSql, pCmd->clauseIndex, pCmd->numOfClause); - // tscProcessSql(pSql); - // - // rows = taos_fetch_row_impl(res); - // } + // if the rows is not NULL, return immediately + rows = taos_fetch_row_impl(res); + } return rows; } int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { SSqlObj *pSql = (SSqlObj *)res; - SSqlRes *pRes = &pSql->res; + SSqlCmd *pCmd = &pSql->cmd; int nRows = 0; @@ -755,32 +749,19 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { // instead of two-stage mergevnodeProcessMsgFromShell free qhandle nRows = taos_fetch_block_impl(res, rows); - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); - while (*rows == NULL && tscProjectionQueryOnSTable(pQueryInfo, 0)) { - /* reach the maximum number of output rows, abort */ - if (tscHasReachLimitation(pSql)) { - return 0; - } - - SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); - - /* update the limit value according to current retrieval results */ - pQueryInfo->limit.limit = pSql->cmd.globalLimit - pRes->numOfTotal; - pQueryInfo->limit.offset = pRes->offset; - - if ((++pMeterMetaInfo->vnodeIndex) < pMeterMetaInfo->pMetricMeta->numOfVnodes) { - pSql->cmd.command = TSDB_SQL_SELECT; - assert(pSql->fp == NULL); - tscProcessSql(pSql); - nRows = taos_fetch_block_impl(res, rows); - } - - // check!!! - if (*rows != NULL || pMeterMetaInfo->vnodeIndex >= pMeterMetaInfo->pMetricMeta->numOfVnodes) { - break; - } + // current subclause is completed, try the next subclause + while (rows == NULL && pCmd->clauseIndex < pCmd->numOfClause - 1) { + pSql->cmd.command = TSDB_SQL_SELECT; + pCmd->clauseIndex++; + + assert(pSql->fp == NULL); + + tscTrace("%p try data in the next subclause:%d, total subclause:%d", pSql, pCmd->clauseIndex, pCmd->numOfClause); + tscProcessSql(pSql); + + nRows = taos_fetch_block_impl(res, rows); } - + return nRows; } @@ -1042,7 +1023,8 @@ int taos_validate_sql(TAOS *taos, const char *sql) { pRes->numOfRows = 1; pRes->numOfTotal = 0; - + pRes->numOfTotalInCurrentClause = 0; + tscTrace("%p Valid SQL: %s pObj:%p", pSql, sql, pObj); int32_t sqlLen = strlen(sql); @@ -1172,6 +1154,8 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { SSqlRes *pRes = &pSql->res; pRes->numOfTotal = 0; // the number of getting table meta from server + pRes->numOfTotalInCurrentClause = 0; + pRes->code = 0; assert(pSql->fp == NULL); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index c456324de5f5674097a9ccda5767bf6d1ca828f7..7b1a71d0ce8cf61d6d007508b53dadef93e634c4 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -37,12 +37,9 @@ * fullmetername + '.' + '(nil)' + '.' + '(nil)' + relation + '.' + [tagId1, * tagId2,...] + '.' + group_orderType */ -void tscGetMetricMetaCacheKey(SSqlCmd* pCmd, int32_t subClauseIndex, char* str, uint64_t uid) { +void tscGetMetricMetaCacheKey(SQueryInfo* pQueryInfo, char* str, uint64_t uid) { int32_t index = -1; - - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); - - SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoByUid(pQueryInfo, subClauseIndex, uid, &index); + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoByUid(pQueryInfo, uid, &index); int32_t len = 0; char tagIdBuf[128] = {0}; @@ -395,7 +392,8 @@ void tscFreeSqlObjPartial(SSqlObj* pSql) { pRes->row = 0; pRes->numOfRows = 0; pRes->numOfTotal = 0; - + pRes->numOfTotalInCurrentClause = 0; + pRes->numOfGroups = 0; tfree(pRes->pGroupRec); @@ -1591,13 +1589,14 @@ SMeterMetaInfo* tscGetMeterMetaInfo(SSqlCmd* pCmd, int32_t clauseIndex, int32_t } SMeterMetaInfo* tscGetMeterMetaInfoFromQueryInfo(SQueryInfo* pQueryInfo, int32_t tableIndex) { + assert(pQueryInfo != NULL); + if (pQueryInfo->pMeterInfo == NULL) { assert(pQueryInfo->numOfTables == 0); return NULL; } - assert(pQueryInfo != NULL && tableIndex >= 0 && tableIndex <= pQueryInfo->numOfTables && - pQueryInfo->pMeterInfo != NULL); + assert(tableIndex >= 0 && tableIndex <= pQueryInfo->numOfTables && pQueryInfo->pMeterInfo != NULL); return pQueryInfo->pMeterInfo[tableIndex]; } @@ -1628,7 +1627,7 @@ int32_t tscGetQueryInfoDetailSafely(SSqlCmd* pCmd, int32_t subClauseIndex, SQuer return TSDB_CODE_SUCCESS; } -SMeterMetaInfo* tscGetMeterMetaInfoByUid(SQueryInfo* pQueryInfo, int32_t subClauseIndex, uint64_t uid, int32_t* index) { +SMeterMetaInfo* tscGetMeterMetaInfoByUid(SQueryInfo* pQueryInfo, uint64_t uid, int32_t* index) { int32_t k = -1; for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { @@ -1642,6 +1641,7 @@ SMeterMetaInfo* tscGetMeterMetaInfoByUid(SQueryInfo* pQueryInfo, int32_t subClau *index = k; } + assert(k != -1); return tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, k); } @@ -1777,6 +1777,10 @@ void tscClearMeterMetaInfo(SMeterMetaInfo* pMeterMetaInfo, bool removeFromCache) } void tscResetForNextRetrieve(SSqlRes* pRes) { + if (pRes == NULL) { + return; + } + pRes->row = 0; pRes->numOfRows = 0; } @@ -1877,7 +1881,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void pNew->param = param; char key[TSDB_MAX_TAGS_LEN + 1] = {0}; - tscGetMetricMetaCacheKey(pCmd, pCmd->clauseIndex, key, uid); + tscGetMetricMetaCacheKey(pQueryInfo, key, uid); #ifdef _DEBUG_VIEW printf("the metricmeta key is:%s\n", key); @@ -1980,13 +1984,9 @@ int32_t tscInvalidSQLErrMsg(char* msg, const char* additionalInfo, const char* s return TSDB_CODE_INVALID_SQL; } -bool tscHasReachLimitation(SSqlObj* pSql) { - assert(pSql != NULL && pSql->cmd.globalLimit != 0); - - SSqlCmd* pCmd = &pSql->cmd; - SSqlRes* pRes = &pSql->res; - - return (pCmd->globalLimit > 0 && pRes->numOfTotal >= pCmd->globalLimit); +bool tscHasReachLimitation(SQueryInfo* pQueryInfo, SSqlRes* pRes) { + assert(pQueryInfo != NULL && pQueryInfo->clauseLimit != 0); + return (pQueryInfo->clauseLimit > 0 && pRes->numOfTotalInCurrentClause >= pQueryInfo->clauseLimit); } char* tscGetErrorMsgPayload(SSqlCmd* pCmd) { return pCmd->payload; }