From 86c4d343390d3a0ffa45967f5317a496399428b0 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Mon, 25 Jul 2022 11:19:52 +0800 Subject: [PATCH] fix(stream): recover the overwritten code --- source/libs/executor/src/executorimpl.c | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 7bac828a53..b063d0b3bd 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3328,6 +3328,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { if (pLimitInfo->remainGroupOffset > 0) { if (pLimitInfo->currentGroupId == 0 || pLimitInfo->currentGroupId == pBlock->info.groupId) { // it is the first group pLimitInfo->currentGroupId = pBlock->info.groupId; + ASSERT(pTaskInfo->execModel != OPTR_EXEC_MODEL_STREAM); continue; } else if (pLimitInfo->currentGroupId != pBlock->info.groupId) { // now it is the data from a new group @@ -3336,6 +3337,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { // ignore data block in current group if (pLimitInfo->remainGroupOffset > 0) { + ASSERT(pTaskInfo->execModel != OPTR_EXEC_MODEL_STREAM); continue; } } @@ -3380,10 +3382,12 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { if (pLimitInfo->remainOffset >= pInfo->pRes->info.rows) { pLimitInfo->remainOffset -= pInfo->pRes->info.rows; blockDataCleanup(pInfo->pRes); + ASSERT(pTaskInfo->execModel != OPTR_EXEC_MODEL_STREAM); continue; } else if (pLimitInfo->remainOffset < pInfo->pRes->info.rows && pLimitInfo->remainOffset > 0) { blockDataTrimFirstNRows(pInfo->pRes, pLimitInfo->remainOffset); pLimitInfo->remainOffset = 0; + ASSERT(pTaskInfo->execModel != OPTR_EXEC_MODEL_STREAM); } // check for the limitation in each group @@ -3391,6 +3395,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { pLimitInfo->numOfOutputRows + pInfo->pRes->info.rows >= pLimitInfo->limit.limit) { int32_t keepRows = (int32_t)(pLimitInfo->limit.limit - pLimitInfo->numOfOutputRows); blockDataKeepFirstNRows(pInfo->pRes, keepRows); + ASSERT(pTaskInfo->execModel != OPTR_EXEC_MODEL_STREAM); if (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups) { pOperator->status = OP_EXEC_DONE; } @@ -3412,7 +3417,8 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { // continue merge data, ignore the group id blockDataMerge(pFinalRes, pInfo->pRes); - if (pFinalRes->info.rows + pInfo->pRes->info.rows <= pOperator->resultInfo.threshold) { + if (pFinalRes->info.rows + pInfo->pRes->info.rows <= pOperator->resultInfo.threshold && + pTaskInfo->execModel != OPTR_EXEC_MODEL_STREAM) { continue; } } -- GitLab