From 1114d358a403662217ae116ae76081069a3a6337 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 3 Apr 2023 17:06:51 +0800 Subject: [PATCH] fix:lose consume data because of exec close if consume while insert data --- source/libs/executor/src/projectoperator.c | 28 +++++++++++----------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 97951d3d8a..0f7820f076 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -232,11 +232,11 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { } if (pOperator->status == OP_EXEC_DONE) { - if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) { - pOperator->status = OP_OPENED; - qDebug("projection in queue model, set status open and return null"); - return NULL; - } +// if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) { +// pOperator->status = OP_OPENED; +// qDebug("projection in queue model, set status open and return null"); +// return NULL; +// } return NULL; } @@ -262,19 +262,19 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { // The downstream exec may change the value of the newgroup, so use a local variable instead. SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); - if (pBlock == NULL) { - if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE && pFinalRes->info.rows == 0) { - pOperator->status = OP_OPENED; - return NULL; - } + if (pBlock == NULL && pTaskInfo->execModel != OPTR_EXEC_MODEL_QUEUE) { +// if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE && pFinalRes->info.rows == 0) { +// pOperator->status = OP_OPENED; +// return NULL; +// } qDebug("set op close, exec %d, status %d rows %" PRId64 , pTaskInfo->execModel, pOperator->status, pFinalRes->info.rows); setOperatorCompleted(pOperator); break; } - if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) { - qDebug("set status recv"); - pOperator->status = OP_EXEC_RECV; - } +// if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) { +// qDebug("set status recv"); +// pOperator->status = OP_EXEC_RECV; +// } // for stream interval if (pBlock->info.type == STREAM_RETRIEVE || pBlock->info.type == STREAM_DELETE_RESULT || -- GitLab