diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index cffd8d06736198b51bc96fe90c24de6546d53aac..066f83fbcbb96b1df73d50982c0ba2702bc2b296 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -45,7 +45,6 @@ enum { TASK_STATUS__FAIL, TASK_STATUS__STOP, TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner -// TASK_STATUS__SCAN_HISTORY_WAL, // scan history data in wal TASK_STATUS__HALT, // pause, but not be manipulated by user command TASK_STATUS__PAUSE, // pause }; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a7e912c1f767ba968e391effb6ec222c970fb71f..105ad73362469af4e1665fe00c05e8759e3f353f 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1092,7 +1092,13 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { tqDebug("s-task:%s start scan-history stage(step 1), status:%s", id, pStatus); if (pTask->tsInfo.step1Start == 0) { + ASSERT(pTask->status.pauseAllowed == false); pTask->tsInfo.step1Start = taosGetTimestampMs(); + if (pTask->info.fillHistory == 1) { + streamTaskEnablePause(pTask); + } + } else { + tqDebug("s-task:%s resume from paused, start ts:%"PRId64, pTask->id.idStr, pTask->tsInfo.step1Start); } // we have to continue retrying to successfully execute the scan history task. @@ -1106,14 +1112,13 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { return 0; } - ASSERT(pTask->status.pauseAllowed == false); if (pTask->info.fillHistory == 1) { - streamTaskEnablePause(pTask); + ASSERT(pTask->status.pauseAllowed == true); } streamSourceScanHistoryData(pTask); if (pTask->status.taskStatus == TASK_STATUS__PAUSE) { - double el = taosGetTimestampMs() - pTask->tsInfo.step1Start; + double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0; tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs, sched-status:%d", pTask->id.idStr, el, TASK_SCHED_STATUS__INACTIVE); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); @@ -1129,26 +1134,24 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { SStreamTask* pStreamTask = NULL; bool done = false; - // if (!pReq->igUntreated && !streamTaskRecoverScanStep1Finished(pTask)) { - // 1. stop the related stream task, get the current scan wal version of stream task, ver. + // 1. get the related stream task pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId); if (pStreamTask == NULL) { // todo delete this task, if the related stream task is dropped qError("failed to find s-task:0x%x, it may have been destroyed, drop fill-history task:%s", pTask->streamTaskId.taskId, pTask->id.idStr); - pTask->status.taskStatus = TASK_STATUS__DROPPING; tqDebug("s-task:%s fill-history task set status to be dropping", id); - streamMetaSaveTask(pMeta, pTask); + streamMetaUnregisterTask(pMeta, pTask->id.taskId); streamMetaReleaseTask(pMeta, pTask); return -1; } ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE); - // stream task in TASK_STATUS__SCAN_HISTORY can not be paused. - // wait for the stream task get ready for scan history data + // 2. it cannot be paused, when the stream task in TASK_STATUS__SCAN_HISTORY status. Let's wait for the + // stream task get ready for scan history data while (pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { tqDebug( "s-task:%s level:%d related stream task:%s(status:%s) not ready for halt, wait for it and recheck in 100ms", @@ -1158,6 +1161,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { // now we can stop the stream task execution streamTaskHalt(pStreamTask); + tqDebug("s-task:%s level:%d sched-status:%d is halt by fill-history task:%s", pStreamTask->id.idStr, pStreamTask->info.taskLevel, pStreamTask->status.schedStatus, id); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index e52b8f54a5d812bae9111ad197ea779a746fc29e..0036d72ece5bd65199df15f1a3e6a576824313c7 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -404,6 +404,8 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { streamTaskReleaseState(pTask); streamTaskReloadState(pStreamTask); + // clear the link between fill-history task and stream task info + pStreamTask->historyTaskId.taskId = 0; streamTaskResumeFromHalt(pStreamTask); qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr); @@ -414,6 +416,7 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { // save to disk taosWLockLatch(&pMeta->lock); + streamMetaSaveTask(pMeta, pStreamTask); if (streamMetaCommit(pMeta) < 0) { // persist to disk diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index ecf874a1ac62592847fd4adae2047571f78eb2b5..f9bbe96d9775b5734d92a2d9e878d6b539daa2a5 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -832,7 +832,7 @@ void streamTaskPause(SStreamTask* pTask) { return; } - while(!pTask->status.pauseAllowed || (pTask->status.taskStatus == TASK_STATUS__HALT)) { + while (!pTask->status.pauseAllowed || (pTask->status.taskStatus == TASK_STATUS__HALT)) { status = pTask->status.taskStatus; if (status == TASK_STATUS__DROPPING) { qDebug("vgId:%d s-task:%s task already dropped, do nothing", pMeta->vgId, pTask->id.idStr); @@ -849,8 +849,19 @@ void streamTaskPause(SStreamTask* pTask) { taosMsleep(100); } + // todo: use the lock of the task. + taosWLockLatch(&pMeta->lock); + + status = pTask->status.taskStatus; + if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) { + taosWUnLockLatch(&pMeta->lock); + qDebug("vgId:%d s-task:%s task already dropped/stopped/paused, do nothing", pMeta->vgId, pTask->id.idStr); + return; + } + atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE); + taosWUnLockLatch(&pMeta->lock); int64_t el = taosGetTimestampMs() - st; qDebug("vgId:%d s-task:%s set pause flag, prev:%s, elapsed time:%dms", pMeta->vgId, pTask->id.idStr,