diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 8e286c6e1bf6d7f5a59d9ddad792a386f9e0c50c..c4033b548253f9896b1bba2d2798a00d6c357168 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -588,6 +588,7 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock bool streamTaskShouldStop(const SStreamStatus* pStatus); bool streamTaskShouldPause(const SStreamStatus* pStatus); bool streamTaskIsIdle(const SStreamTask* pTask); +int32_t streamTaskEndScanWAL(SStreamTask* pTask); SStreamChildEpInfo * streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId); int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz); @@ -605,7 +606,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs int32_t streamLaunchFillHistoryTask(SStreamTask* pTask); int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask); int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated); -void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer); +bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer); bool streamTaskRecoverScanStep1Finished(SStreamTask* pTask); bool streamTaskRecoverScanStep2Finished(SStreamTask* pTask); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index f60cc2f406e43c9441c4e38c0e0fb8b52ee66ce0..a7ce18198dfcd4fe93675a936d43585b83a43d79 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -241,7 +241,7 @@ bool tqNextBlockImpl(STqReader *pReader, const char *idstr); SWalReader* tqGetWalReader(STqReader* pReader); SSDataBlock* tqGetResultBlock (STqReader* pReader); -int32_t extractMsgFromWal(SWalReader *pReader, void **pItem, const char *id); +int32_t extractMsgFromWal(SWalReader *pReader, void **pItem, int64_t maxVer, const char *id); int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver); bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids); int32_t tqRetrieveDataBlock(STqReader *pReader, SSDataBlock** pRes, const char* idstr); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 3d715afc7167a8564a0cc0887484120ebe79da34..15b66b8e074f5efd4d656b0e2fe37a9c5fe2d50f 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1128,6 +1128,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { if (pTask->info.fillHistory) { SVersionRange* pRange = NULL; SStreamTask* pStreamTask = NULL; + bool done = false; if (!pReq->igUntreated && !streamTaskRecoverScanStep1Finished(pTask)) { // 1. stop the related stream task, get the current scan wal version of stream task, ver. @@ -1157,58 +1158,55 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { // now we can stop the stream task execution streamTaskHalt(pStreamTask); - tqDebug("s-task:%s level:%d is halt by fill-history task:%s", pStreamTask->id.idStr, pStreamTask->info.taskLevel, - id); + tqDebug("s-task:%s level:%d sched-status:%d is halt by fill-history task:%s", pStreamTask->id.idStr, + pStreamTask->info.taskLevel, pStreamTask->status.schedStatus, id); // if it's an source task, extract the last version in wal. pRange = &pTask->dataRange.range; int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader); - streamHistoryTaskSetVerRangeStep2(pTask, latestVer); + done = streamHistoryTaskSetVerRangeStep2(pTask, latestVer); } - if (!streamTaskRecoverScanStep1Finished(pTask)) { - STimeWindow* pWindow = &pTask->dataRange.window; - tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64 - ", do secondary scan-history data after halt the related stream task:%s", - id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey, id); - ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING); - + if (done) { pTask->tsInfo.step2Start = taosGetTimestampMs(); - streamSetParamForStreamScannerStep2(pTask, pRange, pWindow); - } - - if (!streamTaskRecoverScanStep2Finished(pTask)) { - pTask->status.taskStatus = TASK_STATUS__SCAN_HISTORY_WAL; - if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) { - tqDebug("s-task:%s is dropped or paused, abort recover in step1", id); - streamMetaReleaseTask(pMeta, pTask); - return 0; + streamTaskEndScanWAL(pTask); + } else { + if (!streamTaskRecoverScanStep1Finished(pTask)) { + STimeWindow* pWindow = &pTask->dataRange.window; + tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64 + ", do secondary scan-history data after halt the related stream task:%s", + id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey, id); + ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING); + + pTask->tsInfo.step2Start = taosGetTimestampMs(); + streamSetParamForStreamScannerStep2(pTask, pRange, pWindow); } - int64_t dstVer = pTask->dataRange.range.minVer - 1; - walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer); - tqDebug("s-task:%s seek wal reader to ver:%"PRId64, id, dstVer); - } + if (!streamTaskRecoverScanStep2Finished(pTask)) { + pTask->status.taskStatus = TASK_STATUS__SCAN_HISTORY_WAL; + if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) { + tqDebug("s-task:%s is dropped or paused, abort recover in step1", id); + streamMetaReleaseTask(pMeta, pTask); + return 0; + } -// int64_t el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0; -// tqDebug("s-task:%s history data scan stage(step 2) ended, elapsed time:%.2fs", id, el); -// -// // 3. notify downstream tasks to transfer executor state after handle all history blocks. -// if (!pTask->status.transferState) { -// code = streamDispatchTransferStateMsg(pTask); -// if (code != TSDB_CODE_SUCCESS) { -// // todo handle error -// } -// -// pTask->status.transferState = true; -// } + int64_t dstVer = pTask->dataRange.range.minVer - 1; - // 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task. - // 5. resume the related stream task. - streamTryExec(pTask); + pTask->chkInfo.currentVer = dstVer; + walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer); + tqDebug("s-task:%s wal reader start scan from WAL ver:%" PRId64 ", set sched-status:%d", id, dstVer, + TASK_SCHED_STATUS__INACTIVE); + } - streamMetaReleaseTask(pMeta, pTask); - streamMetaReleaseTask(pMeta, pStreamTask); + atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); + + // 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task. + // 5. resume the related stream task. + streamMetaReleaseTask(pMeta, pTask); + streamMetaReleaseTask(pMeta, pStreamTask); + + tqStartStreamTasks(pTq); + } } else { // todo update the chkInfo version for current task. // this task has an associated history stream task, so we need to scan wal from the end version of @@ -1218,7 +1216,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { if (pTask->historyTaskId.taskId == 0) { *pWindow = (STimeWindow){INT64_MIN, INT64_MAX}; tqDebug( - "s-task:%s scan history in stream time window completed, no related fill history task, reset the time " + "s-task:%s scan history in stream time window completed, no related fill-history task, reset the time " "window:%" PRId64 " - %" PRId64, id, pWindow->skey, pWindow->ekey); } else { diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 675cbe4549d1bf3d0dc916e34bef968951e1363b..518596a47cd5be15d47e1e6ee555dd6ccaa399de 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -302,13 +302,17 @@ int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) { return 0; } -int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, const char* id) { +int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, const char* id) { int32_t code = walNextValidMsg(pReader); if (code != TSDB_CODE_SUCCESS) { return code; } int64_t ver = pReader->pHead->head.version; + if (ver > maxVer) { + tqDebug("maxVer in WAL:%"PRId64" reached, do not scan wal anymore, %s", maxVer, id); + return TSDB_CODE_SUCCESS; + } if (pReader->pHead->head.msgType == TDMT_VND_SUBMIT) { void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SSubmitReq2Msg)); diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 921ea2cc68508ef47a447c31935a6ae840d0b6b2..614adfedede0de06625f1d9d11070c0c912a4d71 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -38,9 +38,7 @@ int32_t tqStreamTasksScanWal(STQ* pTq) { if (shouldIdle) { taosWLockLatch(&pMeta->lock); - pMeta->walScanCounter -= 1; - times = pMeta->walScanCounter; - + times = (--pMeta->walScanCounter); ASSERT(pMeta->walScanCounter >= 0); if (pMeta->walScanCounter <= 0) { @@ -242,7 +240,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { int32_t status = pTask->status.taskStatus; // non-source or fill-history tasks don't need to response the WAL scan action. - if (pTask->info.taskLevel != TASK_LEVEL__SOURCE || pTask->info.fillHistory == 1) { + if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) { streamMetaReleaseTask(pStreamMeta, pTask); continue; } @@ -280,10 +278,10 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { } int32_t numOfItemsInQ = taosQueueItemSize(pTask->inputQueue->queue); + int64_t maxVer = (pTask->info.fillHistory == 1)? pTask->dataRange.range.maxVer:INT64_MAX; - // append the data for the stream SStreamQueueItem* pItem = NULL; - code = extractMsgFromWal(pTask->exec.pWalReader, (void**) &pItem, pTask->id.idStr); + code = extractMsgFromWal(pTask->exec.pWalReader, (void**) &pItem, maxVer, pTask->id.idStr); if ((code != TSDB_CODE_SUCCESS || pItem == NULL) && (numOfItemsInQ == 0)) { // failed, continue streamMetaReleaseTask(pStreamMeta, pTask); @@ -297,6 +295,17 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader); pTask->chkInfo.currentVer = ver; tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr, ver); + + { + if (pTask->info.fillHistory == 1) { + // the maximum version of data in the WAL has reached already, the step2 is done + if (pTask->chkInfo.currentVer > pTask->dataRange.range.maxVer) { + qWarn("s-task:%s fill-history scan WAL, reach the maximum ver:%" PRId64 ", not scan wal anymore, set the transfer state flag", + pTask->id.idStr, pTask->chkInfo.currentVer); + pTask->status.transferState = true; + } + } + } } else { tqError("s-task:%s append input queue failed, too many in inputQ, ver:%" PRId64, pTask->id.idStr, pTask->chkInfo.currentVer); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index a15b128a99766fd1984b27daa832ca619a861f74..71b9d44a0ada48cc8c989684171999c960972e75 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1775,19 +1775,32 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo) } static void doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STimeWindow* pWindow, const char* id) { - if (pWindow->skey != INT64_MIN) { - qDebug("%s filter for additional history window, skey:%"PRId64, id, pWindow->skey); - + if (pWindow->skey != INT64_MIN || pWindow->ekey != INT64_MAX) { bool* p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool)); - bool hasUnqualified = false; + bool hasUnqualified = false; SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsIndex); - for(int32_t i = 0; i < pBlock->info.rows; ++i) { - int64_t* ts = (int64_t*) colDataGetData(pCol, i); - p[i] = (*ts >= pWindow->skey); - if (!p[i]) { - hasUnqualified = true; + if (pWindow->skey != INT64_MIN) { + qDebug("%s filter for additional history window, skey:%" PRId64, id, pWindow->skey); + + for (int32_t i = 0; i < pBlock->info.rows; ++i) { + int64_t* ts = (int64_t*)colDataGetData(pCol, i); + p[i] = (*ts >= pWindow->skey); + + if (!p[i]) { + hasUnqualified = true; + } + } + } else if (pWindow->ekey != INT64_MAX) { + qDebug("%s filter for additional history window, ekey:%" PRId64, id, pWindow->skey); + for (int32_t i = 0; i < pBlock->info.rows; ++i) { + int64_t* ts = (int64_t*)colDataGetData(pCol, i); + p[i] = (*ts <= pWindow->ekey); + + if (!p[i]) { + hasUnqualified = true; + } } } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index c939ea1807490b371393fd15b2bc6d63c8f83a5b..4db3494a3f7c8051cdae18e7d8512ad36815debc 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -510,12 +510,12 @@ int32_t streamExecForAll(SStreamTask* pTask) { /*int32_t code = */extractMsgFromInputQ(pTask, &pInput, &batchSize, id); if (pInput == NULL) { ASSERT(batchSize == 0); - if (pTask->info.fillHistory && pTask->status.transferState) { - int32_t code = streamTransferStateToStreamTask(pTask); - if (code != TSDB_CODE_SUCCESS) { // todo handle this - return 0; - } - } +// if (pTask->info.fillHistory && pTask->status.transferState) { +// int32_t code = streamTransferStateToStreamTask(pTask); +// if (code != TSDB_CODE_SUCCESS) { // todo handle this +// return 0; +// } +// } break; } @@ -584,6 +584,28 @@ bool streamTaskIsIdle(const SStreamTask* pTask) { return (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE); } +int32_t streamTaskEndScanWAL(SStreamTask* pTask) { + const char* id = pTask->id.idStr; + double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0; + qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el); + + // 3. notify downstream tasks to transfer executor state after handle all history blocks. + pTask->status.transferState = true; + + int32_t code = streamDispatchTransferStateMsg(pTask); + if (code != TSDB_CODE_SUCCESS) { + // todo handle error + } + + // the last execution of fill-history task, in order to transfer task operator states. + code = streamTransferStateToStreamTask(pTask); + if (code != TSDB_CODE_SUCCESS) { // todo handle this + return code; + } + + return TSDB_CODE_SUCCESS; +} + int32_t streamTryExec(SStreamTask* pTask) { // this function may be executed by multi-threads, so status check is required. int8_t schedStatus = @@ -600,27 +622,11 @@ int32_t streamTryExec(SStreamTask* pTask) { // todo the task should be commit here if (taosQueueEmpty(pTask->inputQueue->queue)) { - if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY_WAL && - pTask->chkInfo.currentVer > pTask->dataRange.range.maxVer) { - // fill-history WAL scan has completed + // fill-history WAL scan has completed + if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY_WAL && pTask->status.transferState == true) { streamTaskRecoverSetAllStepFinished(pTask); - - double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0; - qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el); - - // 3. notify downstream tasks to transfer executor state after handle all history blocks. - if (!pTask->status.transferState) { - code = streamDispatchTransferStateMsg(pTask); - if (code != TSDB_CODE_SUCCESS) { - // todo handle error - } - - pTask->status.transferState = true; - } - - // the last execution of fill-history task, in order to transfer task operator states. - code = streamExecForAll(pTask); - + streamTaskEndScanWAL(pTask); + } else { atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index fd5d35808ef0f84a3a1677bec10a503998ba2081..41f28f375bb8965d5f9c5d843b68c8a6e9d34e22 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -284,7 +284,7 @@ int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* // common int32_t streamSetParamForScanHistory(SStreamTask* pTask) { - qDebug("s-task:%s set operator option for scan-history-data", pTask->id.idStr); + qDebug("s-task:%s set operator option for scan-history data", pTask->id.idStr); return qSetStreamOperatorOptionForScanHistory(pTask->exec.pExecutor); } @@ -507,7 +507,7 @@ static void doCheckDownstreamStatus(SStreamTask* pTask, SStreamTask* pHTask) { pHTask->dataRange.range.maxVer = pTask->chkInfo.currentVer; if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - qDebug("s-task:%s set the launch condition for fill history s-task:%s, window:%" PRId64 " - %" PRId64 + qDebug("s-task:%s set the launch condition for fill-history s-task:%s, window:%" PRId64 " - %" PRId64 " ver range:%" PRId64 " - %" PRId64, pTask->id.idStr, pHTask->id.idStr, pHTask->dataRange.window.skey, pHTask->dataRange.window.ekey, pHTask->dataRange.range.minVer, pHTask->dataRange.range.maxVer); @@ -654,7 +654,7 @@ int32_t streamTaskRecoverSetAllStepFinished(SStreamTask* pTask) { return qStreamRecoverSetAllStepFinished(exec); } -void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) { +bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) { SVersionRange* pRange = &pTask->dataRange.range; ASSERT(latestVer >= pRange->maxVer); @@ -663,13 +663,16 @@ void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) { // no input data yet. no need to execute the secondardy scan while stream task halt streamTaskRecoverSetAllStepFinished(pTask); qDebug( - "s-task:%s no need to perform secondary scan-history data(step 2), since no data ingest during secondary scan", - pTask->id.idStr); + "s-task:%s no need to perform secondary scan-history data(step 2), since no data ingest during step1 scan, " + "related stream task currentVer:%" PRId64, + pTask->id.idStr, latestVer); + return true; } else { // 2. do secondary scan of the history data, the time window remain, and the version range is updated to // [pTask->dataRange.range.maxVer, ver1] pRange->minVer = nextStartVer; pRange->maxVer = latestVer - 1; + return false; } }