diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 786ee9d079a229eb78e1c04ae3eba20bf5393351..7bf19cec63f89c7c17e79cc6af3a76566cbcaf51 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -15,6 +15,8 @@ #include "streamInc.h" +#define STREAM_EXEC_MAX_BATCH_NUM 100 + static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes) { int32_t code; void* exec = pTask->exec.executor; @@ -221,6 +223,9 @@ int32_t streamExecForAll(SStreamTask* pTask) { batchCnt++; input = newRet; streamQueueProcessSuccess(pTask->inputQueue); + if (batchCnt > STREAM_EXEC_MAX_BATCH_NUM) { + break; + } } } }