提交 1114d358 编写于 作者: wmmhello's avatar wmmhello

fix:lose consume data because of exec close if consume while insert data

上级 2d6436c5
...@@ -232,11 +232,11 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { ...@@ -232,11 +232,11 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
} }
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) { // if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {
pOperator->status = OP_OPENED; // pOperator->status = OP_OPENED;
qDebug("projection in queue model, set status open and return null"); // qDebug("projection in queue model, set status open and return null");
return NULL; // return NULL;
} // }
return NULL; return NULL;
} }
...@@ -262,19 +262,19 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { ...@@ -262,19 +262,19 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
// The downstream exec may change the value of the newgroup, so use a local variable instead. // The downstream exec may change the value of the newgroup, so use a local variable instead.
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) { if (pBlock == NULL && pTaskInfo->execModel != OPTR_EXEC_MODEL_QUEUE) {
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE && pFinalRes->info.rows == 0) { // if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE && pFinalRes->info.rows == 0) {
pOperator->status = OP_OPENED; // pOperator->status = OP_OPENED;
return NULL; // return NULL;
} // }
qDebug("set op close, exec %d, status %d rows %" PRId64 , pTaskInfo->execModel, pOperator->status, pFinalRes->info.rows); qDebug("set op close, exec %d, status %d rows %" PRId64 , pTaskInfo->execModel, pOperator->status, pFinalRes->info.rows);
setOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
break; break;
} }
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) { // if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {
qDebug("set status recv"); // qDebug("set status recv");
pOperator->status = OP_EXEC_RECV; // pOperator->status = OP_EXEC_RECV;
} // }
// for stream interval // for stream interval
if (pBlock->info.type == STREAM_RETRIEVE || pBlock->info.type == STREAM_DELETE_RESULT || if (pBlock->info.type == STREAM_RETRIEVE || pBlock->info.type == STREAM_DELETE_RESULT ||
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册