diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index 2b69eb57a87a2ef8e419b83a4fefc5ce8b9ed2a3..9fbd60639a60f8b3a89de9140d6ef31449945091 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -3487,9 +3487,6 @@ static void diff_function(SQLFunctionCtx *pCtx) { int32_t forwardStep = (isFirstBlock) ? notNullElems - 1 : notNullElems; GET_RES_INFO(pCtx)->numOfRes += forwardStep; - - pCtx->pOutput += forwardStep * pCtx->outputBytes; - pCtx->ptsOutputBuf = (char*)pCtx->ptsOutputBuf + forwardStep * TSDB_KEYSIZE; } } diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index df753d6f18467bc0fb9226e3e19740535dd0dcd2..5904f606b56ba77af67170006a369e72ecdd490a 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -324,6 +324,13 @@ int64_t getNumOfResult_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, i return maxOutput; } +static void setNumOfRes(SQLFunctionCtx* pCtx, int32_t numOfOutput) { + for (int32_t j = 0; j < numOfOutput; ++j) { + SResultRowCellInfo *pResInfo = GET_RES_INFO(&pCtx[j]); + pResInfo->numOfRes = 0; + } +} + bool isGroupbyColumn(SSqlGroupbyExpr *pGroupbyExpr) { if (pGroupbyExpr == NULL || pGroupbyExpr->numOfGroupCols == 0) { return false; @@ -3559,16 +3566,20 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, SR } void updateOutputBuf(SArithOperatorInfo* pInfo, int32_t numOfInputRows) { - SSDataBlock* pDataBlock = pInfo->binfo.pRes; - - if (pInfo->bufCapacity < pDataBlock->info.rows + numOfInputRows) { - int32_t newSize = pDataBlock->info.rows + numOfInputRows; + SOptrBasicInfo* pBInfo = &pInfo->binfo; + SSDataBlock* pDataBlock = pBInfo->pRes; + int32_t newSize = pDataBlock->info.rows + numOfInputRows; + if (pInfo->bufCapacity < newSize) { for(int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { SColumnInfoData *pColInfo = taosArrayGet(pDataBlock->pDataBlock, i); char* p = realloc(pColInfo->pData, newSize * pColInfo->info.bytes); if (p != NULL) { pColInfo->pData = p; + + // it starts from the tail of the previously generated results. + pBInfo->pCtx[i].pOutput = pColInfo->pData; + pInfo->bufCapacity = newSize; } else { // longjmp } @@ -3577,7 +3588,13 @@ void updateOutputBuf(SArithOperatorInfo* pInfo, int32_t numOfInputRows) { for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { SColumnInfoData *pColInfo = taosArrayGet(pDataBlock->pDataBlock, i); - pInfo->binfo.pCtx[i].pOutput = pColInfo->pData + pColInfo->info.bytes * pDataBlock->info.rows; + pBInfo->pCtx[i].pOutput = pColInfo->pData + pColInfo->info.bytes * pDataBlock->info.rows; + + // re-estabilish output buffer pointer. + int32_t functionId = pBInfo->pCtx[i].functionId; + if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) { + pBInfo->pCtx[i].ptsOutputBuf = pBInfo->pCtx[0].pOutput; + } } } @@ -3697,8 +3714,6 @@ void prepareRepeatTableScan(SQueryRuntimeEnv* pRuntimeEnv) { } } } - - #endif void initCtxOutputBuf_rv(SQLFunctionCtx* pCtx, int32_t size) { @@ -6383,12 +6398,14 @@ static SSDataBlock* doArithmeticOperation(void* param) { updateOutputBuf(pArithInfo, pBlock->info.rows); arithmeticApplyFunctions(pRuntimeEnv, pArithInfo->binfo.pCtx, pOperator->numOfOutput); - pArithInfo->binfo.pRes->info.rows += pBlock->info.rows; + pArithInfo->binfo.pRes->info.rows = getNumOfResult_rv(pRuntimeEnv, pArithInfo->binfo.pCtx, pOperator->numOfOutput); if (pArithInfo->binfo.pRes->info.rows >= 4096) { break; } } + setNumOfRes(pArithInfo->binfo.pCtx, pOperator->numOfOutput); + if (pArithInfo->binfo.pRes->info.rows > 0) { return pArithInfo->binfo.pRes; } else { @@ -6414,8 +6431,12 @@ static SSDataBlock* doLimit(void* param) { if (pInfo->total + pBlock->info.rows >= pInfo->limit) { pBlock->info.rows = (pInfo->limit - pInfo->total); + pInfo->total = pInfo->limit; + setQueryStatus(pOperator->pRuntimeEnv->pQuery, QUERY_COMPLETED); pOperator->completed = true; + } else { + pInfo->total += pBlock->info.rows; } return pBlock; @@ -6753,8 +6774,9 @@ SOperatorInfo* createStableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SArithOperatorInfo* pInfo = calloc(1, sizeof(SArithOperatorInfo)); - pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput); - pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); + pInfo->bufCapacity = 4096; + pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput); + pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); setDefaultOutputBuf(pRuntimeEnv, pInfo->binfo.pCtx, &pInfo->binfo.resultRowInfo, pInfo->binfo.pRes, pInfo->binfo.rowCellInfoOffset);