未验证 提交 5c758830 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #22112 from taosdata/fix/3_liaohj

fix(stream): set downstream task ready state.
...@@ -1137,14 +1137,15 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1137,14 +1137,15 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
// wait for the stream task get ready for scan history data // wait for the stream task get ready for scan history data
while (((pStreamTask->status.downstreamReady == 0) && (pStreamTask->status.taskStatus != TASK_STATUS__STOP)) || while (((pStreamTask->status.downstreamReady == 0) && (pStreamTask->status.taskStatus != TASK_STATUS__STOP)) ||
pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
tqDebug("s-task:%s level:%d related stream task:%s not ready for halt, wait for it and recheck in 100ms", pId, tqDebug(
pTask->info.taskLevel, pId); "s-task:%s level:%d related stream task:%s(status:%s) not ready for halt, wait for it and recheck in 100ms",
pId, pTask->info.taskLevel, pStreamTask->id.idStr, streamGetTaskStatusStr(pStreamTask->status.taskStatus));
taosMsleep(100); taosMsleep(100);
} }
// now we can stop the stream task execution // now we can stop the stream task execution
pStreamTask->status.taskStatus = TASK_STATUS__HALT; pStreamTask->status.taskStatus = TASK_STATUS__HALT;
tqDebug("s-task:%s level:%d status is set to halt by history scan task:%s", pId, tqDebug("s-task:%s level:%d status is set to halt by history scan task:%s", pStreamTask->id.idStr,
pStreamTask->info.taskLevel, pId); pStreamTask->info.taskLevel, pId);
// if it's an source task, extract the last version in wal. // if it's an source task, extract the last version in wal.
......
...@@ -201,6 +201,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs ...@@ -201,6 +201,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
if (left == 0) { if (left == 0) {
taosArrayDestroy(pTask->checkReqIds); taosArrayDestroy(pTask->checkReqIds);
pTask->checkReqIds = NULL; pTask->checkReqIds = NULL;
pTask->status.downstreamReady = 1;
if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
qDebug("s-task:%s all %d downstream tasks are ready, now enter into scan-history-data stage, status:%s", id, qDebug("s-task:%s all %d downstream tasks are ready, now enter into scan-history-data stage, status:%s", id,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册