From 64656a59d9a9f4fb45d3355cd927093900c75620 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Thu, 1 Jun 2023 16:28:32 +0800 Subject: [PATCH] reset sream fill block index --- source/libs/executor/src/filloperator.c | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index 90b7f4a77e..519ddc1916 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -1002,9 +1002,10 @@ static void doStreamFillImpl(SOperatorInfo* pOperator) { SSDataBlock* pBlock = pInfo->pSrcBlock; uint64_t groupId = pBlock->info.id.groupId; SSDataBlock* pRes = pInfo->pRes; + SColumnInfoData* pTsCol = taosArrayGet(pInfo->pSrcBlock->pDataBlock, pInfo->primaryTsCol); + TSKEY* tsCol = (TSKEY*)pTsCol->pData; pRes->info.id.groupId = groupId; - SColumnInfoData* pTsCol = taosArrayGet(pInfo->pSrcBlock->pDataBlock, pInfo->primaryTsCol); - TSKEY* tsCol = (TSKEY*)pTsCol->pData; + pInfo->srcRowIndex++; if (pInfo->srcRowIndex == 0) { keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex, groupId, pFillSup->rowSize); @@ -1242,7 +1243,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { SSDataBlock* fillResult = NULL; SOperatorInfo* downstream = pOperator->pDownstream[0]; while (1) { - if (pInfo->srcRowIndex >= pInfo->pSrcBlock->info.rows) { + if (pInfo->srcRowIndex >= pInfo->pSrcBlock->info.rows || pInfo->pSrcBlock->info.rows == 0) { // If there are delete datablocks, we receive them first. SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { @@ -1281,7 +1282,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { case STREAM_PULL_DATA: { doApplyStreamScalarCalculation(pOperator, pBlock, pInfo->pSrcBlock); memcpy(pInfo->pSrcBlock->info.parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN); - pInfo->srcRowIndex = 0; + pInfo->srcRowIndex = -1; } break; case STREAM_CREATE_CHILD_TABLE: { return pBlock; @@ -1497,7 +1498,7 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi goto _error; } - pInfo->srcRowIndex = 0; + pInfo->srcRowIndex = -1; setOperatorInfo(pOperator, "StreamFillOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = -- GitLab