提交 8890fa57 编写于 作者: H Haojun Liao

fix(stream): execute the stream task directly, instead of executing it in a asynchronized way.

上级 4393375e
...@@ -592,14 +592,21 @@ int32_t streamTryExec(SStreamTask* pTask) { ...@@ -592,14 +592,21 @@ int32_t streamTryExec(SStreamTask* pTask) {
if (pTask->status.transferState) { if (pTask->status.transferState) {
code = streamTransferStateToStreamTask(pTask); code = streamTransferStateToStreamTask(pTask);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
return code; return code;
} }
streamSchedExec(pTask);
} else { // the schedStatus == TASK_SCHED_STATUS__ACTIVE, streamSchedExec cannot be executed, so execute once again by
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); // call this function (streamExecForAll) directly.
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus), code = streamExecForAll(pTask);
pTask->status.schedStatus); if (code < 0) {
// do nothing
}
} }
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id,
streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
} else { } else {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus), qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus),
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册