diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 4d433042ad9be3f741cecd9edb994aa870e5c7e4..41e9268452ebb8f24be8a0376b5b40103ca219d1 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1277,7 +1277,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { if (done) { pTask->tsInfo.step2Start = taosGetTimestampMs(); streamTaskEndScanWAL(pTask); - streamMetaReleaseTask(pMeta, pTask); } else { STimeWindow* pWindow = &pTask->dataRange.window; tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64 @@ -1303,13 +1302,11 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { streamSetStatusNormal(pTask); } - // 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task. - // 5. resume the related stream task. - streamMetaReleaseTask(pMeta, pTask); - streamMetaReleaseTask(pMeta, pStreamTask); - tqStartStreamTasks(pTq); } + + streamMetaReleaseTask(pMeta, pTask); + streamMetaReleaseTask(pMeta, pStreamTask); } else { // todo update the chkInfo version for current task. // this task has an associated history stream task, so we need to scan wal from the end version of @@ -1515,7 +1512,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { if (pTask != NULL) { // even in halt status, the data in inputQ must be processed int8_t st = pTask->status.taskStatus; - if (st == TASK_STATUS__NORMAL || st == TASK_STATUS__SCAN_HISTORY/* || st == TASK_STATUS__SCAN_HISTORY_WAL*/) { + if (st == TASK_STATUS__NORMAL || st == TASK_STATUS__SCAN_HISTORY) { tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.version); streamProcessRunReq(pTask); @@ -1528,8 +1525,9 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { streamMetaReleaseTask(pTq->pStreamMeta, pTask); tqStartStreamTasks(pTq); return 0; - } else { - tqError("vgId:%d failed to found s-task, taskId:%d", vgId, taskId); + } else { // NOTE: pTask->status.schedStatus is not updated since it is not be handled by the run exec. + // todo add one function to handle this + tqError("vgId:%d failed to found s-task, taskId:0x%x may have been dropped", vgId, taskId); return -1; } } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 05767db2869e429a031fcc8a09ba82b6f9a6309a..7832834cee13fe64364da4c0eefd3564c65bf5e9 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1046,7 +1046,7 @@ int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; STimeWindow* pWindow = &pTaskInfo->streamInfo.fillHistoryWindow; - qDebug("%s set remove scan-history filter window:%" PRId64 "-%" PRId64 ", new window:%" PRId64 "-%" PRId64, + qDebug("%s remove scan-history filter window:%" PRId64 "-%" PRId64 ", set new window:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWindow->skey, pWindow->ekey, INT64_MIN, INT64_MAX); pWindow->skey = INT64_MIN; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index cbe7b95bf0d67db3b03fe2f22115232a33034652..1fd2f7edf47c63b14360aa023b1c128c27a170c5 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -21,6 +21,7 @@ #define MAX_STREAM_RESULT_DUMP_THRESHOLD 100 static int32_t updateCheckPointInfo(SStreamTask* pTask); +static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask); bool streamTaskShouldStop(const SStreamStatus* pStatus) { int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus); @@ -544,8 +545,11 @@ int32_t streamExecForAll(SStreamTask* pTask) { return 0; } +// the task may be set dropping/stopping, while it is still in the task queue, therefore, the sched-status can not +// be updated by tryExec function, therefore, the schedStatus will always be the TASK_SCHED_STATUS__WAITING. bool streamTaskIsIdle(const SStreamTask* pTask) { - return (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE); + return (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE || pTask->status.taskStatus == TASK_STATUS__STOP || + pTask->status.taskStatus == TASK_STATUS__DROPPING); } int32_t streamTaskEndScanWAL(SStreamTask* pTask) { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 7886091401a5b2baf2e4a0f78686526cd8c9fdf0..80b690e20d0504c0b36b0754a596b39306129001 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -330,7 +330,8 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId) { } taosWUnLockLatch(&pMeta->lock); - qDebug("s-task:0x%x set task status:%s", taskId, streamGetTaskStatusStr(TASK_STATUS__DROPPING)); + qDebug("s-task:0x%x set task status:%s and start to unregister it", taskId, + streamGetTaskStatusStr(TASK_STATUS__DROPPING)); while (1) { taosRLockLatch(&pMeta->lock);