diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 24547c1e0bdbfd86b9eb9a4ab45001733fea868e..955dd734cf58582ecfe1bbf9871525be0ebc161c 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -464,6 +464,7 @@ typedef struct SSWindowOperatorInfo { TSKEY prevTs; // previous timestamp int32_t numOfRows; // number of rows int32_t start; // start row index + bool reptScan; // next round scan } SSWindowOperatorInfo; typedef struct SStateWindowOperatorInfo { @@ -473,7 +474,7 @@ typedef struct SStateWindowOperatorInfo { int32_t colIndex; // start row index int32_t start; char* prevData; // previous data - + bool reptScan; } SStateWindowOperatorInfo ; typedef struct SDistinctOperatorInfo { diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 3c999fca0b35fafb0a34d298f8479e42060758ba..7bf5f9bba7e3de8c4973517038e0745eb3b66d35 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -30,6 +30,7 @@ #define IS_MASTER_SCAN(runtime) ((runtime)->scanFlag == MASTER_SCAN) #define IS_REVERSE_SCAN(runtime) ((runtime)->scanFlag == REVERSE_SCAN) +#define IS_REPEAT_SCAN(runtime) ((runtime)->scanFlag == REPEAT_SCAN) #define SET_MASTER_SCAN_FLAG(runtime) ((runtime)->scanFlag = MASTER_SCAN) #define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN) @@ -1336,6 +1337,10 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInf int64_t gap = pOperator->pRuntimeEnv->pQueryAttr->sw.gap; pInfo->numOfRows = 0; + if (IS_REPEAT_SCAN(pRuntimeEnv) && !pInfo->reptScan) { + pInfo->reptScan = true; + pInfo->prevTs = INT64_MIN; + } TSKEY* tsList = (TSKEY*)pColInfoData->pData; for (int32_t j = 0; j < pSDataBlock->info.rows; ++j) { @@ -1345,7 +1350,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInf pInfo->prevTs = tsList[j]; pInfo->numOfRows = 1; pInfo->start = j; - } else if (tsList[j] - pInfo->prevTs <= gap) { + } else if (tsList[j] - pInfo->prevTs <= gap && (tsList[j] - pInfo->prevTs) >= 0) { pInfo->curWindow.ekey = tsList[j]; pInfo->prevTs = tsList[j]; pInfo->numOfRows += 1; @@ -5175,6 +5180,10 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI SColumnInfoData* pTsColInfoData = taosArrayGet(pSDataBlock->pDataBlock, 0); TSKEY* tsList = (TSKEY*)pTsColInfoData->pData; + if (IS_REPEAT_SCAN(pRuntimeEnv) && !pInfo->reptScan) { + pInfo->reptScan = true; + tfree(pInfo->prevData); + } pInfo->numOfRows = 0; for (int32_t j = 0; j < pSDataBlock->info.rows; ++j) { @@ -5761,6 +5770,7 @@ SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SStateWindowOperatorInfo* pInfo = calloc(1, sizeof(SStateWindowOperatorInfo)); pInfo->colIndex = -1; + pInfo->reptScan = false; pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); @@ -5788,7 +5798,8 @@ SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); - pInfo->prevTs = INT64_MIN; + pInfo->prevTs = INT64_MIN; + pInfo->reptScan = false; SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "SessionWindowAggOperator";