diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 9d2c0a4f6acc29f243a54f747a88ba877b84b95f..e34b27e9b87f610aaa27c775f37346018342331c 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -598,7 +598,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs int32_t streamLaunchFillHistoryTask(SStreamTask* pTask); int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask); int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated); -void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask); +void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer); bool streamTaskRecoverScanStep1Finished(SStreamTask* pTask); bool streamTaskRecoverScanStep2Finished(SStreamTask* pTask); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 02509d994d76aa1fcef45aa16577a6e0178bd6f0..1f6c162b9d162d310315aba8a65cecf02337c0de 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1154,16 +1154,15 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { taosMsleep(100); } - streamTaskHalt(pTask); - // now we can stop the stream task execution - // todo upgrade the statu to be HALT from PAUSE or NORMAL - pStreamTask->status.taskStatus = TASK_STATUS__HALT; - tqDebug("s-task:%s level:%d status is set to halt by fill-history task:%s", pStreamTask->id.idStr, - pStreamTask->info.taskLevel, id); + streamTaskHalt(pStreamTask); + tqDebug("s-task:%s level:%d is halt by fill-history task:%s", pStreamTask->id.idStr, pStreamTask->info.taskLevel, + id); // if it's an source task, extract the last version in wal. - streamHistoryTaskSetVerRangeStep2(pTask); + pRange = &pTask->dataRange.range; + int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader); + streamHistoryTaskSetVerRangeStep2(pTask, latestVer); } if (!streamTaskRecoverScanStep1Finished(pTask)) { diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 42eb27bb8f9d35c619b9c691a64f77b0a1e8fb73..1c9e2672d19ccaaf1ae76ddd8aa7c87b690031e6 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -654,9 +654,8 @@ int32_t streamTaskRecoverSetAllStepFinished(SStreamTask* pTask) { return qStreamRecoverSetAllStepFinished(exec); } -void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask) { +void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) { SVersionRange* pRange = &pTask->dataRange.range; - int64_t latestVer = walReaderGetCurrentVer(pTask->exec.pWalReader); ASSERT(latestVer >= pRange->maxVer); int64_t nextStartVer = pRange->maxVer + 1;