diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 4db86c25aac5554fdc2cd820a3ccc97fc7da87b5..f6d92fb8d12b9e48cddc08ff8b245080c5408da4 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -210,6 +210,7 @@ typedef struct SQuery { int32_t srcRowSize; // todo extract struct int32_t resultRowSize; + int32_t intermediateResultRowSize; // intermediate result row size, in case of top-k query. int32_t maxSrcColumnSize; int32_t tagLen; // tag value length of current query SSqlGroupbyExpr* pGroupbyExpr; diff --git a/src/query/inc/qResultbuf.h b/src/query/inc/qResultbuf.h index 080e3f09bbfe2231657efa3e6ef29979a0c3b407..e46af8c29891cf52fbbac97c6afb42e0d0571215 100644 --- a/src/query/inc/qResultbuf.h +++ b/src/query/inc/qResultbuf.h @@ -76,7 +76,7 @@ typedef struct SDiskbasedResultBuf { SResultBufStatis statis; } SDiskbasedResultBuf; -#define DEFAULT_INTERN_BUF_PAGE_SIZE (256L) // in bytes +#define DEFAULT_INTERN_BUF_PAGE_SIZE (1024L) // in bytes #define PAGE_INFO_INITIALIZER (SPageDiskInfo){-1, -1} /** diff --git a/src/query/inc/qUtil.h b/src/query/inc/qUtil.h index ac9740e7ba5bf112bbaeaed433765c2d7e7c7c0a..c5bdf28817e84877bf50fe59f5ec23bc7b8abba8 100644 --- a/src/query/inc/qUtil.h +++ b/src/query/inc/qUtil.h @@ -54,9 +54,9 @@ static FORCE_INLINE SResultRow *getResultRow(SResultRowInfo *pResultRowInfo, int static FORCE_INLINE char *getPosInResultPage(SQuery *pQuery, tFilePage* page, int32_t rowOffset, int16_t offset) { assert(rowOffset >= 0 && pQuery != NULL); -// int32_t realRowId = (int32_t)(rowId * GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pQuery->topBotQuery, pQuery->stableQuery)); + int32_t numOfRows = GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pQuery->topBotQuery, pQuery->stableQuery); // return ((char *)page->data) + offset * numOfRowsPerPage + bytes * realRowId; - return ((char *)page->data) + rowOffset + offset; + return ((char *)page->data) + rowOffset + offset * numOfRows; } bool isNullOperator(SColumnFilterElem *pFilter, const char* minval, const char* maxval, int16_t type); @@ -82,7 +82,7 @@ void interResToBinary(SBufferWriter* bw, SArray* pRes, int32_t tagLen); SArray* interResFromBinary(const char* data, int32_t len); void freeInterResult(void* param); -void initGroupResInfo(SGroupResInfo* pGroupResInfo, SResultRowInfo* pResultInfo, int32_t offset); +void initGroupResInfo(SGroupResInfo* pGroupResInfo, SResultRowInfo* pResultInfo); void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo); bool hasRemainDataInCurrentGroup(SGroupResInfo* pGroupResInfo); bool hasRemainData(SGroupResInfo* pGroupResInfo); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 86bbfb32d7e9e5fb84237b2fc62214cd16030e81..4c9edb5008bb41a699365e1ca69df0355cccb1b6 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -653,7 +653,8 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRow // not assign result buffer yet, add new result buffer if (pResultRow->pageId == -1) { - int32_t ret = addNewWindowResultBuf(pResultRow, pResultBuf, (int32_t) groupId, pRuntimeEnv->pQuery->resultRowSize); + + int32_t ret = addNewWindowResultBuf(pResultRow, pResultBuf, (int32_t) groupId, pRuntimeEnv->pQuery->intermediateResultRowSize); if (ret != TSDB_CODE_SUCCESS) { return -1; } @@ -2444,8 +2445,8 @@ static SColumnInfo* doGetTagColumnInfoById(SColumnInfo* pTagColList, int32_t num static void doSetTagValueInParam(void* pTable, int32_t tagColId, tVariant *tag, int16_t type, int16_t bytes); //TODO refactor -int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, STableScanInfo* pTableScanInfo, SSDataBlock *pBlock, - uint32_t *status) { +int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, + uint32_t* status) { *status = BLK_DATA_NO_NEEDED; pBlock->pDataBlock = NULL; pBlock->pBlockStatis = NULL; @@ -2502,19 +2503,23 @@ int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, STableScanInfo* int32_t numOfOutput = pTableScanInfo->numOfOutput; SQLFunctionCtx* pCtx = pTableScanInfo->pCtx; - for (int32_t i = 0; i < numOfOutput; ++i) { - int32_t functionId = pCtx[i].functionId; - int32_t colId = pTableScanInfo->pExpr[i].base.colInfo.colId; - - // group by + first/last should not apply the first/last block filter - if (!pQuery->groupbyColumn && (functionId == TSDB_FUNC_FIRST_DST || functionId == TSDB_FUNC_LAST_DST)) { - (*status) |= aAggs[functionId].dataReqFunc(&pTableScanInfo->pCtx[i], &pBlock->info.window, colId); - if (((*status) & BLK_DATA_ALL_NEEDED) == BLK_DATA_ALL_NEEDED) { + if (pQuery->groupbyColumn) { + (*status) = BLK_DATA_ALL_NEEDED; + } else { + for (int32_t i = 0; i < numOfOutput; ++i) { + int32_t functionId = pCtx[i].functionId; + int32_t colId = pTableScanInfo->pExpr[i].base.colInfo.colId; + + // group by + first/last should not apply the first/last block filter + if (functionId != TSDB_FUNC_FIRST_DST && functionId != TSDB_FUNC_LAST_DST) { + (*status) |= aAggs[functionId].dataReqFunc(&pTableScanInfo->pCtx[i], &pBlock->info.window, colId); + if (((*status) & BLK_DATA_ALL_NEEDED) == BLK_DATA_ALL_NEEDED) { + break; + } + } else { + (*status) |= BLK_DATA_ALL_NEEDED; break; } - } else { - (*status) |= BLK_DATA_ALL_NEEDED; - break; } } } @@ -3495,12 +3500,6 @@ static int32_t doCopyToSData_rv(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pG int32_t numOfRowsToCopy = pRow->numOfRows; - //current output space is not enough to accommodate all data of this page, prepare more space -// if (numOfRowsToCopy > (pRuntimeEnv->resultInfo.capacity - numOfResult)) { -// int32_t newSize = pRuntimeEnv->resultInfo.capacity + (numOfRowsToCopy - numOfResult); -// expandBuffer(pRuntimeEnv, newSize, pRuntimeEnv->qinfo); -// } - pGroupResInfo->index += 1; tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pRow->pageId); @@ -4169,7 +4168,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts int32_t ps = DEFAULT_PAGE_SIZE; int32_t rowsize = 0; - getIntermediateBufInfo(pRuntimeEnv, &ps, &rowsize); + getIntermediateBufInfo(pRuntimeEnv, &ps, &pQuery->intermediateResultRowSize); int32_t TENMB = 1024*1024*10; if (isSTableQuery && !onlyQueryTags(pQuery)) { @@ -4630,7 +4629,7 @@ static SSDataBlock* doSTableAggregate(void* param) { updateWindowResNumOfRes_rv(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput, &pInfo->resultRowInfo, pInfo->rowCellInfoOffset); - initGroupResInfo(&pRuntimeEnv->groupResInfo, &pInfo->resultRowInfo, 0); + initGroupResInfo(&pRuntimeEnv->groupResInfo, &pInfo->resultRowInfo); toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->pRes); if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { @@ -4785,7 +4784,7 @@ static SSDataBlock* doIntervalAgg(void* param) { setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); finalizeQueryResult_rv(pOperator, pIntervalInfo->pCtx, &pIntervalInfo->resultRowInfo, pIntervalInfo->rowCellInfoOffset); - initGroupResInfo(&pRuntimeEnv->groupResInfo, &pIntervalInfo->resultRowInfo, 0); + initGroupResInfo(&pRuntimeEnv->groupResInfo, &pIntervalInfo->resultRowInfo); toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { @@ -4894,7 +4893,7 @@ static SSDataBlock* doHashGroupbyAgg(void* param) { updateWindowResNumOfRes_rv(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset); - initGroupResInfo(&pRuntimeEnv->groupResInfo, &pInfo->binfo.resultRowInfo, 0); + initGroupResInfo(&pRuntimeEnv->groupResInfo, &pInfo->binfo.resultRowInfo); toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes); if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index dc8f6d56c08e705309bf82f659d10d52679dba9c..67ecc518929869cdd4d63bbbe90c104abcbedc75 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -344,13 +344,13 @@ void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo) { pGroupResInfo->index = 0; } -void initGroupResInfo(SGroupResInfo* pGroupResInfo, SResultRowInfo* pResultInfo, int32_t offset) { +void initGroupResInfo(SGroupResInfo* pGroupResInfo, SResultRowInfo* pResultInfo) { if (pGroupResInfo->pRows != NULL) { taosArrayDestroy(pGroupResInfo->pRows); } pGroupResInfo->pRows = taosArrayFromList(pResultInfo->pResult, pResultInfo->size, POINTER_BYTES); - pGroupResInfo->index = offset; + pGroupResInfo->index = 0; assert(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo)); } diff --git a/tests/script/general/parser/topbot.sim b/tests/script/general/parser/topbot.sim index f5c78d07a1d362ffdd5ebe5c989d88cc35a33e72..b81a1f9b34209e34e198977cc30d4bfe0a22cbe2 100644 --- a/tests/script/general/parser/topbot.sim +++ b/tests/script/general/parser/topbot.sim @@ -73,6 +73,60 @@ if $row != 100 then return -1 endi +sql select bottom(c3, 5) from tb_tb1 interval(1y); +if $rows != 5 then + return -1 +endi + +if $data01 != 0.00000 then + print expect 0.00000, actual:$data01 + return -1 +endi + +if $data11 != 0.00000 then + return -1 +endi + +if $data21 != 0.00000 then + return -1 +endi + +if $data31 != 0.00000 then + return -1 +endi + +sql select top(c4, 5) from tb_tb1 interval(1y); +if $rows != 5 then + return -1 +endi + +if $data01 != 9.000000000 then + print expect 9.000000000, acutal:$data01 + return -1 +endi + +if $data11 != 9.000000000 then + return -1 +endi + +if $data21 != 9.000000000 then + return -1 +endi + +if $data31 != 9.000000000 then + return -1 +endi + +sql select top(c3, 5) from tb_tb1 interval(40h) +if $rows != 25 then + return -1 +endi + +if $data01 != 9.00000 then + print expect 9.00000, actual:$data01 + return -1 +endi + sql select last(*) from tb_tb9 if $row != 1 then return -1