From d88664a3e6cbb4eaa458ef32600f40f0e9d3de10 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 4 Mar 2021 18:12:52 +0800 Subject: [PATCH] [td-3047] refactor. --- src/query/inc/qExecutor.h | 1 + src/query/inc/qResultbuf.h | 2 +- src/query/inc/qUtil.h | 6 +-- src/query/src/qExecutor.c | 47 +++++++++++----------- src/query/src/qUtil.c | 4 +- tests/script/general/parser/topbot.sim | 54 ++++++++++++++++++++++++++ 6 files changed, 84 insertions(+), 30 deletions(-) diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 4db86c25aa..f6d92fb8d1 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -210,6 +210,7 @@ typedef struct SQuery { int32_t srcRowSize; // todo extract struct int32_t resultRowSize; + int32_t intermediateResultRowSize; // intermediate result row size, in case of top-k query. int32_t maxSrcColumnSize; int32_t tagLen; // tag value length of current query SSqlGroupbyExpr* pGroupbyExpr; diff --git a/src/query/inc/qResultbuf.h b/src/query/inc/qResultbuf.h index 080e3f09bb..e46af8c298 100644 --- a/src/query/inc/qResultbuf.h +++ b/src/query/inc/qResultbuf.h @@ -76,7 +76,7 @@ typedef struct SDiskbasedResultBuf { SResultBufStatis statis; } SDiskbasedResultBuf; -#define DEFAULT_INTERN_BUF_PAGE_SIZE (256L) // in bytes +#define DEFAULT_INTERN_BUF_PAGE_SIZE (1024L) // in bytes #define PAGE_INFO_INITIALIZER (SPageDiskInfo){-1, -1} /** diff --git a/src/query/inc/qUtil.h b/src/query/inc/qUtil.h index ac9740e7ba..c5bdf28817 100644 --- a/src/query/inc/qUtil.h +++ b/src/query/inc/qUtil.h @@ -54,9 +54,9 @@ static FORCE_INLINE SResultRow *getResultRow(SResultRowInfo *pResultRowInfo, int static FORCE_INLINE char *getPosInResultPage(SQuery *pQuery, tFilePage* page, int32_t rowOffset, int16_t offset) { assert(rowOffset >= 0 && pQuery != NULL); -// int32_t realRowId = (int32_t)(rowId * GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pQuery->topBotQuery, pQuery->stableQuery)); + int32_t numOfRows = GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pQuery->topBotQuery, pQuery->stableQuery); // return ((char *)page->data) + offset * numOfRowsPerPage + bytes * realRowId; - return ((char *)page->data) + rowOffset + offset; + return ((char *)page->data) + rowOffset + offset * numOfRows; } bool isNullOperator(SColumnFilterElem *pFilter, const char* minval, const char* maxval, int16_t type); @@ -82,7 +82,7 @@ void interResToBinary(SBufferWriter* bw, SArray* pRes, int32_t tagLen); SArray* interResFromBinary(const char* data, int32_t len); void freeInterResult(void* param); -void initGroupResInfo(SGroupResInfo* pGroupResInfo, SResultRowInfo* pResultInfo, int32_t offset); +void initGroupResInfo(SGroupResInfo* pGroupResInfo, SResultRowInfo* pResultInfo); void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo); bool hasRemainDataInCurrentGroup(SGroupResInfo* pGroupResInfo); bool hasRemainData(SGroupResInfo* pGroupResInfo); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 86bbfb32d7..4c9edb5008 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -653,7 +653,8 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRow // not assign result buffer yet, add new result buffer if (pResultRow->pageId == -1) { - int32_t ret = addNewWindowResultBuf(pResultRow, pResultBuf, (int32_t) groupId, pRuntimeEnv->pQuery->resultRowSize); + + int32_t ret = addNewWindowResultBuf(pResultRow, pResultBuf, (int32_t) groupId, pRuntimeEnv->pQuery->intermediateResultRowSize); if (ret != TSDB_CODE_SUCCESS) { return -1; } @@ -2444,8 +2445,8 @@ static SColumnInfo* doGetTagColumnInfoById(SColumnInfo* pTagColList, int32_t num static void doSetTagValueInParam(void* pTable, int32_t tagColId, tVariant *tag, int16_t type, int16_t bytes); //TODO refactor -int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, STableScanInfo* pTableScanInfo, SSDataBlock *pBlock, - uint32_t *status) { +int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, + uint32_t* status) { *status = BLK_DATA_NO_NEEDED; pBlock->pDataBlock = NULL; pBlock->pBlockStatis = NULL; @@ -2502,19 +2503,23 @@ int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, STableScanInfo* int32_t numOfOutput = pTableScanInfo->numOfOutput; SQLFunctionCtx* pCtx = pTableScanInfo->pCtx; - for (int32_t i = 0; i < numOfOutput; ++i) { - int32_t functionId = pCtx[i].functionId; - int32_t colId = pTableScanInfo->pExpr[i].base.colInfo.colId; - - // group by + first/last should not apply the first/last block filter - if (!pQuery->groupbyColumn && (functionId == TSDB_FUNC_FIRST_DST || functionId == TSDB_FUNC_LAST_DST)) { - (*status) |= aAggs[functionId].dataReqFunc(&pTableScanInfo->pCtx[i], &pBlock->info.window, colId); - if (((*status) & BLK_DATA_ALL_NEEDED) == BLK_DATA_ALL_NEEDED) { + if (pQuery->groupbyColumn) { + (*status) = BLK_DATA_ALL_NEEDED; + } else { + for (int32_t i = 0; i < numOfOutput; ++i) { + int32_t functionId = pCtx[i].functionId; + int32_t colId = pTableScanInfo->pExpr[i].base.colInfo.colId; + + // group by + first/last should not apply the first/last block filter + if (functionId != TSDB_FUNC_FIRST_DST && functionId != TSDB_FUNC_LAST_DST) { + (*status) |= aAggs[functionId].dataReqFunc(&pTableScanInfo->pCtx[i], &pBlock->info.window, colId); + if (((*status) & BLK_DATA_ALL_NEEDED) == BLK_DATA_ALL_NEEDED) { + break; + } + } else { + (*status) |= BLK_DATA_ALL_NEEDED; break; } - } else { - (*status) |= BLK_DATA_ALL_NEEDED; - break; } } } @@ -3495,12 +3500,6 @@ static int32_t doCopyToSData_rv(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pG int32_t numOfRowsToCopy = pRow->numOfRows; - //current output space is not enough to accommodate all data of this page, prepare more space -// if (numOfRowsToCopy > (pRuntimeEnv->resultInfo.capacity - numOfResult)) { -// int32_t newSize = pRuntimeEnv->resultInfo.capacity + (numOfRowsToCopy - numOfResult); -// expandBuffer(pRuntimeEnv, newSize, pRuntimeEnv->qinfo); -// } - pGroupResInfo->index += 1; tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pRow->pageId); @@ -4169,7 +4168,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts int32_t ps = DEFAULT_PAGE_SIZE; int32_t rowsize = 0; - getIntermediateBufInfo(pRuntimeEnv, &ps, &rowsize); + getIntermediateBufInfo(pRuntimeEnv, &ps, &pQuery->intermediateResultRowSize); int32_t TENMB = 1024*1024*10; if (isSTableQuery && !onlyQueryTags(pQuery)) { @@ -4630,7 +4629,7 @@ static SSDataBlock* doSTableAggregate(void* param) { updateWindowResNumOfRes_rv(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput, &pInfo->resultRowInfo, pInfo->rowCellInfoOffset); - initGroupResInfo(&pRuntimeEnv->groupResInfo, &pInfo->resultRowInfo, 0); + initGroupResInfo(&pRuntimeEnv->groupResInfo, &pInfo->resultRowInfo); toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->pRes); if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { @@ -4785,7 +4784,7 @@ static SSDataBlock* doIntervalAgg(void* param) { setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); finalizeQueryResult_rv(pOperator, pIntervalInfo->pCtx, &pIntervalInfo->resultRowInfo, pIntervalInfo->rowCellInfoOffset); - initGroupResInfo(&pRuntimeEnv->groupResInfo, &pIntervalInfo->resultRowInfo, 0); + initGroupResInfo(&pRuntimeEnv->groupResInfo, &pIntervalInfo->resultRowInfo); toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { @@ -4894,7 +4893,7 @@ static SSDataBlock* doHashGroupbyAgg(void* param) { updateWindowResNumOfRes_rv(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset); - initGroupResInfo(&pRuntimeEnv->groupResInfo, &pInfo->binfo.resultRowInfo, 0); + initGroupResInfo(&pRuntimeEnv->groupResInfo, &pInfo->binfo.resultRowInfo); toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes); if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index dc8f6d56c0..67ecc51892 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -344,13 +344,13 @@ void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo) { pGroupResInfo->index = 0; } -void initGroupResInfo(SGroupResInfo* pGroupResInfo, SResultRowInfo* pResultInfo, int32_t offset) { +void initGroupResInfo(SGroupResInfo* pGroupResInfo, SResultRowInfo* pResultInfo) { if (pGroupResInfo->pRows != NULL) { taosArrayDestroy(pGroupResInfo->pRows); } pGroupResInfo->pRows = taosArrayFromList(pResultInfo->pResult, pResultInfo->size, POINTER_BYTES); - pGroupResInfo->index = offset; + pGroupResInfo->index = 0; assert(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo)); } diff --git a/tests/script/general/parser/topbot.sim b/tests/script/general/parser/topbot.sim index f5c78d07a1..b81a1f9b34 100644 --- a/tests/script/general/parser/topbot.sim +++ b/tests/script/general/parser/topbot.sim @@ -73,6 +73,60 @@ if $row != 100 then return -1 endi +sql select bottom(c3, 5) from tb_tb1 interval(1y); +if $rows != 5 then + return -1 +endi + +if $data01 != 0.00000 then + print expect 0.00000, actual:$data01 + return -1 +endi + +if $data11 != 0.00000 then + return -1 +endi + +if $data21 != 0.00000 then + return -1 +endi + +if $data31 != 0.00000 then + return -1 +endi + +sql select top(c4, 5) from tb_tb1 interval(1y); +if $rows != 5 then + return -1 +endi + +if $data01 != 9.000000000 then + print expect 9.000000000, acutal:$data01 + return -1 +endi + +if $data11 != 9.000000000 then + return -1 +endi + +if $data21 != 9.000000000 then + return -1 +endi + +if $data31 != 9.000000000 then + return -1 +endi + +sql select top(c3, 5) from tb_tb1 interval(40h) +if $rows != 25 then + return -1 +endi + +if $data01 != 9.00000 then + print expect 9.00000, actual:$data01 + return -1 +endi + sql select last(*) from tb_tb9 if $row != 1 then return -1 -- GitLab