提交 9e9d83d4 编写于 作者: H Haojun Liao

refactor: do some internal refactor

上级 bb4f8515
......@@ -90,6 +90,12 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
return 0;
}
if (pTask->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
qWarn("s-task:%s downstream task inputQ blocked, idle for 1sec and retry", pTask->id.idStr);
taosMsleep(1000);
continue;
}
SSDataBlock* output = NULL;
uint64_t ts = 0;
if ((code = qExecTask(pExecutor, &output, &ts)) < 0) {
......@@ -570,12 +576,6 @@ int32_t streamExecForAll(SStreamTask* pTask) {
break;
}
if (pTask->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
qWarn("s-task:%s downstream task inputQ blocked, idle for 1sec and retry", pTask->id.idStr);
taosMsleep(1000);
continue;
}
// merge multiple input data if possible in the input queue.
qDebug("s-task:%s start to extract data block from inputQ", id);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册