diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 9d3a42f50278a3d3339c6adf88a72ed977fd79c2..db0509d81d83f4d43a555e64fcafcff9252ea6d8 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -604,13 +604,10 @@ int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp); int32_t streamLaunchFillHistoryTask(SStreamTask* pTask); int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask); -int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated); +int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated); bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer); -int32_t streamTaskRecoverSetAllStepFinished(SStreamTask* pTask); - // common -int32_t streamSetParamForScanHistory(SStreamTask* pTask); int32_t streamRestoreParam(SStreamTask* pTask); int32_t streamSetStatusNormal(SStreamTask* pTask); const char* streamGetTaskStatusStr(int32_t status); @@ -624,7 +621,6 @@ void streamTaskEnablePause(SStreamTask* pTask); // source level int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow); int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow); -int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated); int32_t streamSourceScanHistoryData(SStreamTask* pTask); int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 373bd77c29e3e097e331fffacc7ada75778a54b9..0feaeb92c537cfa12e9ab2d4b00cc576af9ac381 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1567,9 +1567,8 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg SStreamMeta* pMeta = pTq->pStreamMeta; SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId); if (pTask == NULL) { - tqError("vgId:%d failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId, + tqError("vgId:%d process pause req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId, pReq->taskId); - // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active return TSDB_CODE_SUCCESS; } @@ -1581,9 +1580,8 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg if (pTask->historyTaskId.taskId != 0) { pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId); if (pHistoryTask == NULL) { - tqError("vgId:%d failed to acquire fill-history task:0x%x, it may have been dropped already. Pause success", + tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%x, it may have been dropped already", pMeta->vgId, pTask->historyTaskId.taskId); - streamMetaReleaseTask(pMeta, pTask); // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active @@ -1591,14 +1589,12 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg } tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr); - streamTaskPause(pHistoryTask); - } - streamMetaReleaseTask(pMeta, pTask); - if (pHistoryTask != NULL) { + streamTaskPause(pHistoryTask); streamMetaReleaseTask(pMeta, pHistoryTask); } + streamMetaReleaseTask(pMeta, pTask); return TSDB_CODE_SUCCESS; } @@ -1627,7 +1623,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, } if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { - streamStartRecoverTask(pTask, igUntreated); + streamStartScanHistoryAsync(pTask, igUntreated); } else if (level == TASK_LEVEL__SOURCE && (taosQueueItemSize(pTask->inputQueue->queue) == 0)) { tqStartStreamTasks(pTq); } else { diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 315a04b6bf3832d507a2f035efea9bebc38634d9..bab15bb31d7d94dbfa8472703d6a688a527fa267 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -582,12 +582,9 @@ int32_t streamTryExec(SStreamTask* pTask) { // todo the task should be commit here if (taosQueueEmpty(pTask->inputQueue->queue)) { // fill-history WAL scan has completed - if (pTask->status.transferState) { - code = streamTransferStateToStreamTask(pTask); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - streamSchedExec(pTask); + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && pTask->status.transferState == true) { + streamTaskFillHistoryFinished(pTask); + streamTaskEndScanWAL(pTask); } else { atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus), diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 59caf313404f6d729a362606c47d093ad7d6749d..7c308902f2ccc231225ebc6a8ba24259500848bd 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -17,23 +17,30 @@ #include "ttimer.h" #include "wal.h" -static void launchFillHistoryTask(SStreamTask* pTask); -static void streamTaskSetRangeStreamCalc(SStreamTask* pTask); +typedef struct SStreamTaskRetryInfo { + SStreamMeta* pMeta; + int32_t taskId; +} SStreamTaskRetryInfo; + +static int32_t streamSetParamForScanHistory(SStreamTask* pTask); +static void launchFillHistoryTask(SStreamTask* pTask); +static void streamTaskSetRangeStreamCalc(SStreamTask* pTask); +static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated); -static void streamTaskSetForReady(SStreamTask* pTask, int32_t numOfReqs) { +static void streamTaskSetReady(SStreamTask* pTask, int32_t numOfReqs) { ASSERT(pTask->status.downstreamReady == 0); pTask->status.downstreamReady = 1; - int64_t el = (taosGetTimestampMs() - pTask->tsInfo.init); + int64_t el = (taosGetTimestampMs() - pTask->tsInfo.init); qDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%dms, task status:%s", pTask->id.idStr, numOfReqs, (int32_t) el, streamGetTaskStatusStr(pTask->status.taskStatus)); } -int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated) { +int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) { SStreamScanHistoryReq req; - streamBuildSourceRecover1Req(pTask, &req, igUntreated); - int32_t len = sizeof(SStreamScanHistoryReq); + initScanHistoryReq(pTask, &req, igUntreated); + int32_t len = sizeof(SStreamScanHistoryReq); void* serializedReq = rpcMallocCont(len); if (serializedReq == NULL) { return -1; @@ -65,9 +72,9 @@ static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) { if (pTask->info.fillHistory) { streamSetParamForScanHistory(pTask); } - streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window); - int32_t code = streamStartRecoverTask(pTask, 0); + streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window); + int32_t code = streamStartScanHistoryAsync(pTask, 0); return code; } @@ -142,7 +149,7 @@ int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask) { } else { qDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId); - streamTaskSetForReady(pTask, 0); + streamTaskSetReady(pTask, 0); streamTaskSetRangeStreamCalc(pTask); streamTaskLaunchScanHistory(pTask); @@ -188,7 +195,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask) { } static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) { - streamTaskSetForReady(pTask, numOfReqs); + streamTaskSetReady(pTask, numOfReqs); const char* id = pTask->id.idStr; int8_t status = pTask->status.taskStatus; @@ -319,7 +326,7 @@ int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange *p return qStreamSourceScanParamForHistoryScanStep2(pTask->exec.pExecutor, pVerRange, pWindow); } -int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated) { +int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated) { pReq->msgHead.vgId = pTask->info.nodeId; pReq->streamId = pTask->id.streamId; pReq->taskId = pTask->id.taskId; @@ -532,11 +539,6 @@ static void doCheckDownstreamStatus(SStreamTask* pTask, SStreamTask* pHTask) { streamTaskDoCheckDownstreamTasks(pHTask); } -typedef struct SStreamTaskRetryInfo { - SStreamMeta* pMeta; - int32_t taskId; -} SStreamTaskRetryInfo; - static void tryLaunchHistoryTask(void* param, void* tmrId) { SStreamTaskRetryInfo* pInfo = param; SStreamMeta* pMeta = pInfo->pMeta; @@ -646,7 +648,7 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) { } } - // dispatch recover finish req to all related downstream task + // dispatch scan-history finish req to all related downstream task code = streamDispatchScanHistoryFinishMsg(pTask); if (code < 0) { return -1; @@ -655,7 +657,7 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) { return 0; } -int32_t streamTaskRecoverSetAllStepFinished(SStreamTask* pTask) { +int32_t streamTaskFillHistoryFinished(SStreamTask* pTask) { void* exec = pTask->exec.pExecutor; return qStreamInfoResetTimewindowFilter(exec); } @@ -667,7 +669,7 @@ bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) { int64_t nextStartVer = pRange->maxVer + 1; if (nextStartVer > latestVer - 1) { // no input data yet. no need to execute the secondardy scan while stream task halt - streamTaskRecoverSetAllStepFinished(pTask); + streamTaskFillHistoryFinished(pTask); qDebug( "s-task:%s no need to perform secondary scan-history data(step 2), since no data ingest during step1 scan, " "related stream task currentVer:%" PRId64, @@ -682,7 +684,6 @@ bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) { } } - int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1; @@ -855,7 +856,7 @@ void streamTaskPause(SStreamTask* pTask) { taosMsleep(100); } - // todo: use the lock of the task. + // todo: use the task lock, stead of meta lock taosWLockLatch(&pMeta->lock); status = pTask->status.taskStatus; @@ -869,6 +870,12 @@ void streamTaskPause(SStreamTask* pTask) { atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE); taosWUnLockLatch(&pMeta->lock); + // in case of fill-history task, stop the tsdb file scan operation. + if (pTask->info.fillHistory == 1) { + void* pExecutor = pTask->exec.pExecutor; + qKillTask(pExecutor, TSDB_CODE_SUCCESS); + } + int64_t el = taosGetTimestampMs() - st; qDebug("vgId:%d s-task:%s set pause flag, prev:%s, elapsed time:%dms", pMeta->vgId, pTask->id.idStr, streamGetTaskStatusStr(pTask->status.keepTaskStatus), (int32_t)el);