diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index e03718b571418031c394b72ddb27253c2c9546d9..faaea12b1a6fb0105c31fa5ac80db0f241aaabec 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -221,11 +221,15 @@ void* qExtractReaderFromStreamScanner(void* scanner); int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner); int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo); -int32_t qStreamSourceScanParamForHistoryScan(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow); +int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow); +int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow); int32_t qStreamSourceRecoverStep2(qTaskInfo_t tinfo, int64_t ver); int32_t qStreamRecoverFinish(qTaskInfo_t tinfo); int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo); bool qStreamRecoverScanFinished(qTaskInfo_t tinfo); +bool qStreamRecoverScanStep1Finished(qTaskInfo_t tinfo); +bool qStreamRecoverScanStep2Finished(qTaskInfo_t tinfo); +int32_t qStreamRecoverSetAllStepFinished(qTaskInfo_t tinfo); void qStreamCloseTsdbReader(void* task); void resetTaskInfo(qTaskInfo_t tinfo); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index a5718b4195ab05ec8415886e12a704c8a396ea31..78acea1cd92a17b299a1f00ccf7fe9ed0c0bde5d 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -580,6 +580,9 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs int32_t streamCheckHistoryTaskDownstrem(SStreamTask* pTask); int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask); int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated); +bool streamTaskRecoverScanStep1Finished(SStreamTask* pTask); +bool streamTaskRecoverScanStep2Finished(SStreamTask* pTask); +int32_t streamTaskRecoverSetAllStepFinished(SStreamTask* pTask); // common int32_t streamSetParamForScanHistoryData(SStreamTask* pTask); @@ -588,7 +591,8 @@ int32_t streamSetStatusNormal(SStreamTask* pTask); const char* streamGetTaskStatusStr(int32_t status); // source level -int32_t streamSetParamForStreamScanner(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow); +int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow); +int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow); int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated); int32_t streamSourceScanHistoryData(SStreamTask* pTask); // int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index acd9e851d3f5f058293a22ad7f9973761b1ec6af..e78411096f756f9963f5de1c60f96c05d715ec21 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1337,10 +1337,10 @@ static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) { return 0; } -int32_t mndPauseAllStreamTasks(STrans *pTrans, SStreamObj *pStream) { - int32_t size = taosArrayGetSize(pStream->tasks); +int32_t mndPauseAllStreamTaskImpl(STrans *pTrans, SArray* tasks) { + int32_t size = taosArrayGetSize(tasks); for (int32_t i = 0; i < size; i++) { - SArray *pTasks = taosArrayGetP(pStream->tasks, i); + SArray *pTasks = taosArrayGetP(tasks, i); int32_t sz = taosArrayGetSize(pTasks); for (int32_t j = 0; j < sz; j++) { SStreamTask *pTask = taosArrayGetP(pTasks, j); @@ -1352,6 +1352,16 @@ int32_t mndPauseAllStreamTasks(STrans *pTrans, SStreamObj *pStream) { return 0; } +int32_t mndPauseAllStreamTasks(STrans *pTrans, SStreamObj *pStream) { + int32_t code = mndPauseAllStreamTaskImpl(pTrans, pStream->tasks); + if (code != 0) { + return code; + } + // pStream->pHTasksList is null + // code = mndPauseAllStreamTaskImpl(pTrans, pStream->pHTasksList); + return code; +} + static int32_t mndPersistStreamLog(STrans *pTrans, const SStreamObj *pStream, int8_t status) { SStreamObj streamObj = {0}; memcpy(streamObj.name, pStream->name, TSDB_STREAM_FNAME_LEN); @@ -1473,6 +1483,7 @@ int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream, int8_t igUn } } } + // pStream->pHTasksList is null return 0; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index b4eb5957c71c988db1fb7b4a48fa56e7c2b579f0..36abd581ee1a6a38af59003f77555397ad51c4e1 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1036,9 +1036,12 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { return 0; } - streamSourceScanHistoryData(pTask); - if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) { - tqDebug("s-task:%s is dropped, abort recover in step1", pTask->id.idStr); + if (!pReq->igUntreated && !streamTaskRecoverScanStep1Finished(pTask)) { + streamSourceScanHistoryData(pTask); + } + if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) { + tqDebug("s-task:%s is dropped or paused, abort recover in step1", pTask->id.idStr); + atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); streamMetaReleaseTask(pMeta, pTask); return 0; } @@ -1047,44 +1050,50 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { tqDebug("s-task:%s history data scan stage(step 1) ended, elapsed time:%.2fs", pTask->id.idStr, el); if (pTask->info.fillHistory) { - // 1. stop the related stream task, get the current scan wal version of stream task, ver. - SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId); - if (pStreamTask == NULL) { - // todo handle error - } - // streamTaskReleaseState(pTask); - // streamTaskReloadState(pStreamTask); + SVersionRange* pRange = NULL; + SStreamTask* pStreamTask = NULL; + if (!pReq->igUntreated && !streamTaskRecoverScanStep1Finished(pTask)) { + // 1. stop the related stream task, get the current scan wal version of stream task, ver. + pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId); + if (pStreamTask == NULL) { + // todo handle error + } - ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE); + ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE); - // wait for the stream task get ready for scan history data - while (((pStreamTask->status.checkDownstream == 0) && (pStreamTask->status.taskStatus != TASK_STATUS__STOP)) || - pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { - tqDebug("s-task:%s level:%d not ready for halt, wait for 100ms and recheck", pStreamTask->id.idStr, - pStreamTask->info.taskLevel); - taosMsleep(100); - } + // wait for the stream task get ready for scan history data + while (((pStreamTask->status.checkDownstream == 0) && (pStreamTask->status.taskStatus != TASK_STATUS__STOP)) || + pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { + tqDebug("s-task:%s level:%d not ready for halt, wait for 100ms and recheck", pStreamTask->id.idStr, + pStreamTask->info.taskLevel); + taosMsleep(100); + } - // now we can stop the stream task execution - pStreamTask->status.taskStatus = TASK_STATUS__HALT; - tqDebug("s-task:%s level:%d status is set to halt by history scan task:%s", pStreamTask->id.idStr, - pStreamTask->info.taskLevel, pTask->id.idStr); + // now we can stop the stream task execution + pStreamTask->status.taskStatus = TASK_STATUS__HALT; + tqDebug("s-task:%s level:%d status is set to halt by history scan task:%s", pStreamTask->id.idStr, + pStreamTask->info.taskLevel, pTask->id.idStr); - // if it's an source task, extract the last version in wal. - int64_t ver = pTask->dataRange.range.maxVer + 1; - int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader); - if (latestVer >= ver) { - ver = latestVer; - } + // if it's an source task, extract the last version in wal. + int64_t ver = pTask->dataRange.range.maxVer + 1; + int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader); + if (latestVer >= ver) { + ver = latestVer; + } - // 2. do secondary scan of the history data, the time window remain, and the version range is updated to [pTask->dataRange.range.maxVer, ver1] - SVersionRange* pRange = &pTask->dataRange.range; + // 2. do secondary scan of the history data, the time window remain, and the version range is updated to [pTask->dataRange.range.maxVer, ver1] + pRange = &pTask->dataRange.range; - pRange->minVer = pRange->maxVer + 1; - pRange->maxVer = ver; - if (pRange->minVer == pRange->maxVer) { - tqDebug("s-task:%s no need to perform secondary scan-history-data(step 2), since no new data ingest", pTask->id.idStr); - } else { + pRange->minVer = pRange->maxVer + 1; + pRange->maxVer = ver; + if (pRange->minVer == pRange->maxVer) { + streamTaskRecoverSetAllStepFinished(pTask); + tqDebug("s-task:%s no need to perform secondary scan-history-data(step 2), since no new data ingest", + pTask->id.idStr); + } + } + + if (!streamTaskRecoverScanStep1Finished(pTask)) { tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " do secondary scan-history-data after halt the related stream task:%s", pTask->id.idStr, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pStreamTask->id.idStr); @@ -1092,25 +1101,30 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING); st = taosGetTimestampMs(); - streamSetParamForStreamScanner(pTask, pRange, &pTask->dataRange.window); + streamSetParamForStreamScannerStep2(pTask, pRange, &pTask->dataRange.window); + } + if(!streamTaskRecoverScanStep2Finished(pTask)) { streamSourceScanHistoryData(pTask); - if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) { - tqDebug("s-task:%s is dropped, abort recover in step1", pTask->id.idStr); + if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) { + tqDebug("s-task:%s is dropped or paused, abort recover in step1", pTask->id.idStr); streamMetaReleaseTask(pMeta, pTask); return 0; } - - el = (taosGetTimestampMs() - st) / 1000.0; - tqDebug("s-task:%s history data scan stage(step 2) ended, elapsed time:%.2fs", pTask->id.idStr, el); + streamTaskRecoverSetAllStepFinished(pTask); } - // 3. notify the downstream tasks to transfer executor state after handle all history blocks. - pTask->status.transferState = true; + el = (taosGetTimestampMs() - st) / 1000.0; + tqDebug("s-task:%s history data scan stage(step 2) ended, elapsed time:%.2fs", pTask->id.idStr, el); + + if (!pTask->status.transferState) { + // 3. notify the downstream tasks to transfer executor state after handle all history blocks. + pTask->status.transferState = true; - code = streamDispatchTransferStateMsg(pTask); - if (code != TSDB_CODE_SUCCESS) { - // todo handle error + code = streamDispatchTransferStateMsg(pTask); + if (code != TSDB_CODE_SUCCESS) { + // todo handle error + } } // 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task. @@ -1119,7 +1133,10 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { pTask->status.taskStatus = TASK_STATUS__DROPPING; tqDebug("s-task:%s set status to be dropping", pTask->id.idStr); - + // transfer the ownership of executor state + // todo(liuyao) + // streamTaskReleaseState(pTask); + // streamTaskReloadState(pStreamTask); streamMetaSaveTask(pMeta, pTask); streamMetaSaveTask(pMeta, pStreamTask); @@ -1165,6 +1182,11 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t sversion, char* msg, int tqError("failed to find task:0x%x", req.taskId); return -1; } + // transfer the ownership of executor state + // todo(liuyao) + // streamTaskReleaseState(pTask); + // SStreamTask* pStreamTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->streamTaskId.taskId); + // streamTaskReloadState(pStreamTask); ASSERT(pTask->streamTaskId.taskId != 0); pTask->status.transferState = true; // persistent data? @@ -1398,51 +1420,76 @@ int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgL return 0; } +int32_t tqProcessTaskPauseImpl(SStreamMeta* pStreamMeta, SStreamTask* pTask) { + if (pTask) { + if (!streamTaskShouldPause(&pTask->status)) { + tqDebug("vgId:%d s-task:%s set pause flag", pStreamMeta->vgId, pTask->id.idStr); + atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus); + atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE); + } + streamMetaReleaseTask(pStreamMeta, pTask); + } else { + return -1; + } + return 0; +} + int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg; SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); + int32_t code = tqProcessTaskPauseImpl(pTq->pStreamMeta, pTask); + if (code != 0) { + return code; + } + SStreamTask* pHistoryTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->historyTaskId.taskId); + code = tqProcessTaskPauseImpl(pTq->pStreamMeta, pHistoryTask); + return code; +} + +int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, int8_t igUntreated) { + int32_t vgId = pTq->pStreamMeta->vgId; if (pTask) { - tqDebug("vgId:%d s-task:%s set pause flag", pTq->pStreamMeta->vgId, pTask->id.idStr); - atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus); - atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE); + if (streamTaskShouldPause(&pTask->status)) { + atomic_store_8(&pTask->status.taskStatus, pTask->status.keepTaskStatus); + + // no lock needs to secure the access of the version + if (igUntreated && pTask->info.taskLevel == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) { + // discard all the data when the stream task is suspended. + walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion); + tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64 + ", schedStatus:%d", + vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus); + } else { // from the previous paused version and go on + tqDebug("vgId:%d s-task:%s resume to exec, from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d", + vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus); + } + + if (pTask->info.fillHistory && pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + streamStartRecoverTask(pTask, igUntreated); + } else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && taosQueueItemSize(pTask->inputQueue->queue) == 0) { + tqStartStreamTasks(pTq); + } else { + streamSchedExec(pTask); + } + } streamMetaReleaseTask(pTq->pStreamMeta, pTask); + } else { + return -1; } return 0; } int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg; - - int32_t vgId = pTq->pStreamMeta->vgId; SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); - if (pTask) { - atomic_store_8(&pTask->status.taskStatus, pTask->status.keepTaskStatus); - - // no lock needs to secure the access of the version - if (pReq->igUntreated && pTask->info.taskLevel == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) { - // discard all the data when the stream task is suspended. - walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion); - tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64 - ", schedStatus:%d", - vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus); - } else { // from the previous paused version and go on - tqDebug("vgId:%d s-task:%s resume to exec, from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d", - vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus); - } - - if (pTask->info.fillHistory && pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - streamStartRecoverTask(pTask, pReq->igUntreated); - } else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && taosQueueItemSize(pTask->inputQueue->queue) == 0) { - tqStartStreamTasks(pTq); - } else { - streamSchedExec(pTask); - } - streamMetaReleaseTask(pTq->pStreamMeta, pTask); - } else { - tqError("vgId:%d failed to find the s-task:0x%x for resume stream task", vgId, pReq->taskId); + int32_t code = tqProcessTaskResumeImpl(pTq, pTask, sversion, pReq->igUntreated); + if (code != 0) { + return code; } - return 0; + SStreamTask* pHistoryTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->historyTaskId.taskId); + code = tqProcessTaskResumeImpl(pTq, pHistoryTask, sversion, pReq->igUntreated); + return code; } int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) { diff --git a/source/libs/executor/inc/querytask.h b/source/libs/executor/inc/querytask.h index 6c2a16bb0c17342f1af7ee4f788c7a0f315c99b1..9f8b7491714d8db4613403b6c523aa67bf3d9d8c 100644 --- a/source/libs/executor/inc/querytask.h +++ b/source/libs/executor/inc/querytask.h @@ -62,6 +62,8 @@ typedef struct { SSchemaWrapper* schema; char tbName[TSDB_TABLE_NAME_LEN]; // this is the current scan table: todo refactor int8_t recoverStep; + bool recoverStep1Finished; + bool recoverStep2Finished; int8_t recoverScanFinished; SQueryTableDataCond tableCond; SVersionRange fillHistoryVer; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 9542a10389f54a29721038ac7fd251244927d7d9..ad79bc87d79e2ec90e0d669c7c088313e25e499f 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -870,7 +870,7 @@ int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) { } } -int32_t qStreamSourceScanParamForHistoryScan(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow) { +int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM); @@ -879,8 +879,29 @@ int32_t qStreamSourceScanParamForHistoryScan(qTaskInfo_t tinfo, SVersionRange *p pStreamInfo->fillHistoryVer = *pVerRange; pStreamInfo->fillHistoryWindow = *pWindow; pStreamInfo->recoverStep = STREAM_RECOVER_STEP__PREPARE1; + pStreamInfo->recoverStep1Finished = false; + pStreamInfo->recoverStep2Finished = false; - qDebug("%s set param for stream scanner for scan history data, Ver:%" PRId64 " - %" PRId64 ", window:%" PRId64 + qDebug("%s step 1. set param for stream scanner for scan history data, Ver:%" PRId64 " - %" PRId64 ", window:%" PRId64 + " - %" PRId64, + GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey, + pWindow->ekey); + return 0; +} + +int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM); + + SStreamTaskInfo* pStreamInfo = &pTaskInfo->streamInfo; + + pStreamInfo->fillHistoryVer = *pVerRange; + pStreamInfo->fillHistoryWindow = *pWindow; + pStreamInfo->recoverStep = STREAM_RECOVER_STEP__PREPARE2; + pStreamInfo->recoverStep1Finished = true; + pStreamInfo->recoverStep2Finished = false; + + qDebug("%s step 2. set param for stream scanner for scan history data, Ver:%" PRId64 " - %" PRId64 ", window:%" PRId64 " - %" PRId64, GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey, pWindow->ekey); @@ -1022,6 +1043,23 @@ bool qStreamRecoverScanFinished(qTaskInfo_t tinfo) { return pTaskInfo->streamInfo.recoverScanFinished; } +bool qStreamRecoverScanStep1Finished(qTaskInfo_t tinfo) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + return pTaskInfo->streamInfo.recoverStep1Finished; +} + +bool qStreamRecoverScanStep2Finished(qTaskInfo_t tinfo) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + return pTaskInfo->streamInfo.recoverStep2Finished; +} + +int32_t qStreamRecoverSetAllStepFinished(qTaskInfo_t tinfo) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + pTaskInfo->streamInfo.recoverStep1Finished = true; + pTaskInfo->streamInfo.recoverStep2Finished = true; + return 0; +} + void* qExtractReaderFromStreamScanner(void* scanner) { SStreamScanInfo* pInfo = scanner; return (void*)pInfo->tqReader; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 96a10b9cd1701f0bbd92a9355f62a58b03a49b0b..8793b11ab14d02b5cbacc8d9da0b5ec2278b4f08 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -179,7 +179,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { int32_t batchCnt = 0; while (1) { - if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) { + if (streamTaskShouldStop(&pTask->status)) { taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); return 0; } @@ -195,6 +195,10 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { finished = true; } else { qSetStreamOpOpen(exec); + if (streamTaskShouldPause(&pTask->status)) { + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); + return 0; + } } break; } diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 577022bbef8468c845fa5181edc7e081e6f433cf..45a8ad0dfac4e4e81323f63efcaab9889fb7870e 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -55,7 +55,7 @@ static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) { pRange->minVer, pRange->maxVer); streamSetParamForScanHistoryData(pTask); - streamSetParamForStreamScanner(pTask, pRange, &pTask->dataRange.window); + streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window); int32_t code = streamStartRecoverTask(pTask, 0); return code; @@ -261,8 +261,12 @@ int32_t streamSetStatusNormal(SStreamTask* pTask) { } // source -int32_t streamSetParamForStreamScanner(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow) { - return qStreamSourceScanParamForHistoryScan(pTask->exec.pExecutor, pVerRange, pWindow); +int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow) { + return qStreamSourceScanParamForHistoryScanStep1(pTask->exec.pExecutor, pVerRange, pWindow); +} + +int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow) { + return qStreamSourceScanParamForHistoryScanStep2(pTask->exec.pExecutor, pVerRange, pWindow); } int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated) { @@ -512,6 +516,21 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) { return 0; } +bool streamTaskRecoverScanStep1Finished(SStreamTask* pTask) { + void* exec = pTask->exec.pExecutor; + return qStreamRecoverScanStep1Finished(exec); +} + +bool streamTaskRecoverScanStep2Finished(SStreamTask* pTask) { + void* exec = pTask->exec.pExecutor; + return qStreamRecoverScanStep2Finished(exec); +} + +int32_t streamTaskRecoverSetAllStepFinished(SStreamTask* pTask) { + void* exec = pTask->exec.pExecutor; + return qStreamRecoverSetAllStepFinished(exec); +} + int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;