diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 9ef13cccbf08da5a300bdf53f4a26cb165f065a1..0eda49b1f40876f695594e80874c60407e1ca45c 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -129,6 +129,8 @@ bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo); bool tscIsTWAQuery(SQueryInfo* pQueryInfo); bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo); bool tscGroupbyColumn(SQueryInfo* pQueryInfo); +bool tscIsTopbotQuery(SQueryInfo* pQueryInfo); +int32_t tscGetTopbotQueryParam(SQueryInfo* pQueryInfo); bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo *pQueryInfo, int32_t tableIndex); bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex); diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index 8a301d1820d4fc88406c4fe5fa5fe708a3ee92d1..89642a067d46d8742ed060292f64e7ca9e8c9cc3 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -338,11 +338,20 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde pReducer->resColModel->capacity = pReducer->nResultBufSize; pReducer->finalModel = pFFModel; + int32_t expandFactor = 1; if (finalmodel->rowSize > 0) { - pReducer->resColModel->capacity /= finalmodel->rowSize; + bool topBotQuery = tscIsTopbotQuery(pQueryInfo); + if (topBotQuery) { + expandFactor = tscGetTopbotQueryParam(pQueryInfo); + pReducer->resColModel->capacity /= (finalmodel->rowSize * expandFactor); + pReducer->resColModel->capacity *= expandFactor; + } else { + pReducer->resColModel->capacity /= finalmodel->rowSize; + } } assert(finalmodel->rowSize > 0 && finalmodel->rowSize <= pReducer->rowSize); + pReducer->pFinalRes = calloc(1, pReducer->rowSize * pReducer->resColModel->capacity); if (pReducer->pTempBuffer == NULL || pReducer->discardData == NULL || pReducer->pResultBuf == NULL || @@ -1440,6 +1449,11 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); tFilePage *tmpBuffer = pLocalMerge->pTempBuffer; + int32_t remain = 1; + if (tscIsTopbotQuery(pQueryInfo)) { + remain = tscGetTopbotQueryParam(pQueryInfo); + } + if (doHandleLastRemainData(pSql)) { return TSDB_CODE_SUCCESS; } @@ -1528,7 +1542,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { * if the previous group does NOT generate any result (pResBuf->num == 0), * continue to process results instead of return results. */ - if ((!sameGroup && pResBuf->num > 0) || (pResBuf->num == pLocalMerge->resColModel->capacity)) { + if ((!sameGroup && pResBuf->num > 0) || (pResBuf->num + remain >= pLocalMerge->resColModel->capacity)) { // does not belong to the same group bool notSkipped = genFinalResults(pSql, pLocalMerge, !sameGroup); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 386352bdc741035168aa0158ef41dcea6c7a7a90..438e5618df8e3ee56da72673fdb7c8c71fe936a3 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -271,6 +271,41 @@ bool tscIsTWAQuery(SQueryInfo* pQueryInfo) { return false; } +bool tscIsTopbotQuery(SQueryInfo* pQueryInfo) { + size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo); + for (int32_t i = 0; i < numOfExprs; ++i) { + SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); + if (pExpr == NULL) { + continue; + } + + int32_t functionId = pExpr->functionId; + if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) { + return true; + } + } + + return false; +} + +int32_t tscGetTopbotQueryParam(SQueryInfo* pQueryInfo) { + size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo); + for (int32_t i = 0; i < numOfExprs; ++i) { + SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); + if (pExpr == NULL) { + continue; + } + + int32_t functionId = pExpr->functionId; + if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) { + return (int32_t) pExpr->param[0].i64; + } + } + + return 0; +} + + void tscClearInterpInfo(SQueryInfo* pQueryInfo) { if (!tscIsPointInterpQuery(pQueryInfo)) { return; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index ea117ed47b3a43267e891a49eff79ac759c44511..bd5fdda0f9e0a5a4e7038ed496e2d00a16939884 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -2631,6 +2631,21 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->pQueryHandle, &pBlock->pBlockStatis); if (pQuery->topBotQuery && pBlock->pBlockStatis != NULL) { + { // set previous window + if (QUERY_IS_INTERVAL_QUERY(pQuery)) { + SResultRow* pResult = NULL; + + bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); + TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey; + + STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQuery); + if (setWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult, groupId, + pTableScanInfo->pCtx, pTableScanInfo->numOfOutput, + pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) { + longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + } + } + } bool load = false; for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { int32_t functionId = pTableScanInfo->pCtx[i].functionId; diff --git a/tests/script/general/parser/select_with_tags.sim b/tests/script/general/parser/select_with_tags.sim index 38a514a51b0fce41e003a1040bf264eeca3bf29a..ab34713d901faabe9e247c7a7d07a33efaf778b5 100644 --- a/tests/script/general/parser/select_with_tags.sim +++ b/tests/script/general/parser/select_with_tags.sim @@ -159,6 +159,14 @@ if $data03 != @abc15@ then return -1 endi +#sql select top(c6, 3) from select_tags_mt0 interval(10a) +#sql select top(c6, 3) from select_tags_mt0 interval(10a) group by tbname; + +sql select top(c6, 10) from select_tags_mt0 interval(10a); +if $rows != 12800 then + return -1 +endi + sql select top(c1, 100), tbname, t1, t2 from select_tags_mt0; if $rows != 100 then return -1