diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 0fccc40889d7400fa62cb74935624d1b7c811f5d..8a0564c12935914e0880429d4097f99ba04fcd09 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2234,7 +2234,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { blockDataCleanup(pResBlock); - int32_t numOfRows = 0; + //int32_t numOfRows = 0; while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { @@ -2263,7 +2263,8 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot); char* v = colDataGetData(pSrc, i); - colDataAppend(pDst, numOfRows, v, false); + //colDataAppend(pDst, numOfRows, v, false); + colDataAppend(pDst, pResBlock->info.rows, v, false); } pResBlock->info.rows += 1; @@ -2312,12 +2313,47 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { } } + // add current row if timestamp match + if (ts == pSliceInfo->current && pSliceInfo->current <= pSliceInfo->win.ekey) { + for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) { + SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[j]; + int32_t dstSlot = pExprInfo->base.resSchema.slotId; + int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId; + + SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, srcSlot); + SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot); + + char* v = colDataGetData(pSrc, i); + colDataAppend(pDst, pResBlock->info.rows, v, false); + } + + pResBlock->info.rows += 1; + doKeepPrevRows(pSliceInfo, pBlock, i); + + pSliceInfo->current = + taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); + + if (pResBlock->info.rows >= pResBlock->info.capacity) { + break; + } + } + if (pSliceInfo->current > pSliceInfo->win.ekey) { doSetOperatorCompleted(pOperator); break; } } } + + //check if need to interpolate after ts range + while (pSliceInfo->current <= pSliceInfo->win.ekey) { + genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pBlock, pBlock->info.rows - 1, pResBlock); + pSliceInfo->current = + taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); + if (pResBlock->info.rows >= pResBlock->info.capacity) { + break; + } + } } // restore the value