提交 e387b2f9 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 4e0f7ffb
...@@ -1109,6 +1109,9 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1109,6 +1109,9 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
streamSourceScanHistoryData(pTask); streamSourceScanHistoryData(pTask);
} }
// disable the pause when handling the step2 scan of tsdb data.
// the whole next procedure cann't be stopped.
// todo fix it: the following procedure should be executed completed and then shutdown when trying to close vnode.
if (pTask->info.fillHistory == 1) { if (pTask->info.fillHistory == 1) {
streamTaskDisablePause(pTask); streamTaskDisablePause(pTask);
} }
...@@ -1153,7 +1156,9 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1153,7 +1156,9 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
taosMsleep(100); taosMsleep(100);
} }
// todo fix the race condition, when pause msg is received from mnode, add lock here
// now we can stop the stream task execution // now we can stop the stream task execution
ASSERT(pStreamTask->status.taskStatus == TASK_STATUS__NORMAL);
pStreamTask->status.taskStatus = TASK_STATUS__HALT; pStreamTask->status.taskStatus = TASK_STATUS__HALT;
// todo disable the pause // todo disable the pause
......
...@@ -571,7 +571,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { ...@@ -571,7 +571,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
} }
bool streamTaskIsIdle(const SStreamTask* pTask) { bool streamTaskIsIdle(const SStreamTask* pTask) {
return (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE/* && pTask->status.taskStatus != TASK_STATUS__HALT*/); return (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE);
} }
int32_t streamTryExec(SStreamTask* pTask) { int32_t streamTryExec(SStreamTask* pTask) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册