From ff2bf356037a720f45d59e09f9d505eca9e978db Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 4 Jul 2023 15:08:39 +0800 Subject: [PATCH] fix(stream): fix error in set the version range for secondary scan. --- source/dnode/vnode/src/tq/tq.c | 42 ++++++++++++------------- source/dnode/vnode/src/tsdb/tsdbRead.c | 9 ++++-- source/libs/executor/src/executor.c | 4 +-- source/libs/executor/src/scanoperator.c | 40 +++++++++++------------ source/libs/stream/src/streamRecover.c | 9 +++--- 5 files changed, 53 insertions(+), 51 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 250c94c2f9..9702779470 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1061,7 +1061,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { int32_t code = TSDB_CODE_SUCCESS; char* msg = pMsg->pCont; - SStreamMeta* pMeta = pTq->pStreamMeta; + SStreamMeta* pMeta = pTq->pStreamMeta; SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)msg; SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId); @@ -1107,7 +1107,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { if (pTask->info.fillHistory) { SVersionRange* pRange = NULL; - SStreamTask* pStreamTask = NULL; + SStreamTask* pStreamTask = NULL; if (!pReq->igUntreated && !streamTaskRecoverScanStep1Finished(pTask)) { // 1. stop the related stream task, get the current scan wal version of stream task, ver. @@ -1120,9 +1120,10 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { // wait for the stream task get ready for scan history data while (((pStreamTask->status.downstreamReady == 0) && (pStreamTask->status.taskStatus != TASK_STATUS__STOP)) || - pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { - tqDebug("s-task:%s level:%d related stream task:%s not ready for halt, wait for it continue and recheck in 100ms", - pTask->id.idStr, pTask->info.taskLevel, pStreamTask->id.idStr); + pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { + tqDebug( + "s-task:%s level:%d related stream task:%s not ready for halt, wait for it continue and recheck in 100ms", + pTask->id.idStr, pTask->info.taskLevel, pStreamTask->id.idStr); taosMsleep(100); } @@ -1132,36 +1133,34 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { pStreamTask->info.taskLevel, pId); // if it's an source task, extract the last version in wal. - int64_t ver = pTask->dataRange.range.maxVer + 1; - int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader); - if (latestVer >= ver) { - ver = latestVer; - } - - // 2. do secondary scan of the history data, the time window remain, and the version range is updated to [pTask->dataRange.range.maxVer, ver1] pRange = &pTask->dataRange.range; + int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader); + ASSERT(latestVer >= pRange->maxVer); - pRange->minVer = pRange->maxVer + 1; - pRange->maxVer = ver; - if (pRange->minVer == pRange->maxVer) { + int64_t nextStartVer = pRange->maxVer + 1; + if (nextStartVer > latestVer - 1) { + // no input data yet. no need to execute the secondardy scan while stream task halt streamTaskRecoverSetAllStepFinished(pTask); - tqDebug("s-task:%s no need to perform secondary scan-history-data(step 2), since no new data ingest", pId); + tqDebug("s-task:%s no need to perform secondary scan-history-data(step 2), since no data ingest during secondary scan", pId); + } else { + // 2. do secondary scan of the history data, the time window remain, and the version range is updated to + // [pTask->dataRange.range.maxVer, ver1] + pRange->minVer = nextStartVer; + pRange->maxVer = latestVer - 1; } } - + if (!streamTaskRecoverScanStep1Finished(pTask)) { tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " do secondary scan-history-data after halt the related stream task:%s", pId, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pStreamTask->id.idStr); - ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING); st = taosGetTimestampMs(); streamSetParamForStreamScannerStep2(pTask, pRange, &pTask->dataRange.window); } - if(!streamTaskRecoverScanStep2Finished(pTask)) { - + if (!streamTaskRecoverScanStep2Finished(pTask)) { streamSourceScanHistoryData(pTask); if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) { tqDebug("s-task:%s is dropped or paused, abort recover in step1", pId); @@ -1215,7 +1214,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { pWindow->ekey); } else { tqDebug("s-task:%s history data scan completed, now start to scan data from wal, start ver:%" PRId64 - ", window:%" PRId64 " - %" PRId64, pId, pTask->chkInfo.currentVer, pWindow->skey, pWindow->ekey); + ", window:%" PRId64 " - %" PRId64, + pId, pTask->chkInfo.currentVer, pWindow->skey, pWindow->ekey); } code = streamTaskScanHistoryDataComplete(pTask); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index b04e643d29..165448fb7b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -248,7 +248,7 @@ static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdb STbData* piMemTbData); static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr, int8_t* pLevel); -static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level); +static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, const char* id); static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader); static int32_t doBuildDataBlock(STsdbReader* pReader); static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader); @@ -775,7 +775,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void pReader->order = pCond->order; pReader->idStr = (idstr != NULL) ? taosStrdup(idstr) : NULL; - pReader->verRange = getQueryVerRange(pVnode, pCond, level); + pReader->verRange = getQueryVerRange(pVnode, pCond, idstr); pReader->type = pCond->type; pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows); pReader->blockInfoBuf.numPerBucket = 1000; // 1000 tables per bucket @@ -3721,7 +3721,7 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* ret return VND_TSDB(pVnode); } -SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level) { +SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, const char* id) { int64_t startVer = (pCond->startVersion == -1) ? 0 : pCond->startVersion; int64_t endVer = 0; @@ -3732,6 +3732,9 @@ SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_ endVer = (pCond->endVersion > pVnode->state.applied) ? pVnode->state.applied : pCond->endVersion; } + tsdbDebug("queried verRange:%"PRId64"-%"PRId64", revised query verRange:%"PRId64"-%"PRId64", %s", pCond->startVersion, + pCond->endVersion, startVer, endVer, id); + return (SVersionRange){.minVer = startVer, .maxVer = endVer}; } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index d67088ebe1..a3d94a0891 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -892,7 +892,7 @@ int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRan pStreamInfo->recoverStep1Finished = false; pStreamInfo->recoverStep2Finished = false; - qDebug("%s step 1. set param for stream scanner for scan history data, Ver:%" PRId64 " - %" PRId64 ", window:%" PRId64 + qDebug("%s step 1. set param for stream scanner for scan history data, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64 " - %" PRId64, GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey, pWindow->ekey); @@ -911,7 +911,7 @@ int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRan pStreamInfo->recoverStep1Finished = true; pStreamInfo->recoverStep2Finished = false; - qDebug("%s step 2. set param for stream scanner for scan history data, Ver:%" PRId64 " - %" PRId64 ", window:%" PRId64 + qDebug("%s step 2. set param for stream scanner for scan history data, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64 " - %" PRId64, GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey, pWindow->ekey); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index bfe4ed0533..689a3afeb1 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1783,28 +1783,28 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { SStorageAPI* pAPI = &pTaskInfo->storageAPI; SStreamScanInfo* pInfo = pOperator->info; + SStreamTaskInfo* pStreamInfo = &pTaskInfo->streamInfo; qDebug("stream scan started, %s", id); - if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1 || - pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE2) { + if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__PREPARE1 || pStreamInfo->recoverStep == STREAM_RECOVER_STEP__PREPARE2) { STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; - memcpy(&pTSInfo->base.cond, &pTaskInfo->streamInfo.tableCond, sizeof(SQueryTableDataCond)); + memcpy(&pTSInfo->base.cond, &pStreamInfo->tableCond, sizeof(SQueryTableDataCond)); - if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1) { - pTSInfo->base.cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer.minVer; - pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer.maxVer; + if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__PREPARE1) { + pTSInfo->base.cond.startVersion = pStreamInfo->fillHistoryVer.minVer; + pTSInfo->base.cond.endVersion = pStreamInfo->fillHistoryVer.maxVer; - pTSInfo->base.cond.twindows = pTaskInfo->streamInfo.fillHistoryWindow; + pTSInfo->base.cond.twindows = pStreamInfo->fillHistoryWindow; qDebug("stream recover step1, verRange:%" PRId64 "-%" PRId64 " window:%"PRId64"-%"PRId64", %s", pTSInfo->base.cond.startVersion, pTSInfo->base.cond.endVersion, pTSInfo->base.cond.twindows.skey, pTSInfo->base.cond.twindows.ekey, id); - pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN1; + pStreamInfo->recoverStep = STREAM_RECOVER_STEP__SCAN1; } else { - pTSInfo->base.cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer.minVer; - pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer.maxVer; + pTSInfo->base.cond.startVersion = pStreamInfo->fillHistoryVer.minVer; + pTSInfo->base.cond.endVersion = pStreamInfo->fillHistoryVer.maxVer; qDebug("stream recover step2, verRange:%" PRId64 " - %" PRId64", %s", pTSInfo->base.cond.startVersion, pTSInfo->base.cond.endVersion, id); - pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN2; + pStreamInfo->recoverStep = STREAM_RECOVER_STEP__SCAN2; } pAPI->tsdReader.tsdReaderClose(pTSInfo->base.dataReader); @@ -1814,11 +1814,11 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { pTSInfo->scanTimes = 0; pTSInfo->currentGroupId = -1; - pTaskInfo->streamInfo.recoverScanFinished = false; + pStreamInfo->recoverScanFinished = false; } - if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN1 || - pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN2) { + if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1 || + pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN2) { if (pInfo->blockRecoverContiCnt > 100) { pInfo->blockRecoverTotCnt += pInfo->blockRecoverContiCnt; pInfo->blockRecoverContiCnt = 0; @@ -1869,11 +1869,11 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { pInfo->blockRecoverContiCnt++; calBlockTbName(pInfo, pInfo->pRecoverRes); if (!pInfo->igCheckUpdate && pInfo->pUpdateInfo) { - if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN1) { + if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1) { TSKEY maxTs = pAPI->stateStore.updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); } else { - pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pTaskInfo->streamInfo.fillHistoryVer.maxVer); + pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pStreamInfo->fillHistoryVer.maxVer); doCheckUpdate(pInfo, pInfo->pRecoverRes->info.window.ekey, pInfo->pRecoverRes); } } @@ -1887,7 +1887,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { printDataBlock(pInfo->pRecoverRes, "scan recover"); return pInfo->pRecoverRes; } - pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE; + pStreamInfo->recoverStep = STREAM_RECOVER_STEP__NONE; STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; pAPI->tsdReader.tsdReaderClose(pTSInfo->base.dataReader); @@ -1896,7 +1896,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { pTSInfo->base.cond.startVersion = -1; pTSInfo->base.cond.endVersion = -1; - pTaskInfo->streamInfo.recoverScanFinished = true; + pStreamInfo->recoverScanFinished = true; return NULL; } @@ -1915,7 +1915,7 @@ FETCH_NEXT_BLOCK: SPackedData* pPacked = taosArrayGet(pInfo->pBlockLists, current); SSDataBlock* pBlock = pPacked->pDataBlock; if (pBlock->info.parTbName[0]) { - pAPI->stateStore.streamStatePutParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, pBlock->info.parTbName); + pAPI->stateStore.streamStatePutParName(pStreamInfo->pState, pBlock->info.id.groupId, pBlock->info.parTbName); } // TODO move into scan @@ -2097,7 +2097,7 @@ FETCH_NEXT_BLOCK: doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL); { // do additional time window filter - STimeWindow* pWindow = &pTaskInfo->streamInfo.fillHistoryWindow; + STimeWindow* pWindow = &pStreamInfo->fillHistoryWindow; if (pWindow->skey != INT64_MIN) { qDebug("%s filter for additional history window, skey:%"PRId64, id, pWindow->skey); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 8e4d50bcf2..9ded58597f 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -50,9 +50,8 @@ const char* streamGetTaskStatusStr(int32_t status) { static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) { SVersionRange* pRange = &pTask->dataRange.range; - qDebug("s-task:%s vgId:%d task status:%s and start to scan-history-data task, ver:%" PRId64 " - %" PRId64, - pTask->id.idStr, pTask->info.nodeId, streamGetTaskStatusStr(pTask->status.taskStatus), - pRange->minVer, pRange->maxVer); + qDebug("s-task:%s vgId:%d status:%s, start scan-history-data task, verRange:%" PRId64 " - %" PRId64, pTask->id.idStr, + pTask->info.nodeId, streamGetTaskStatusStr(pTask->status.taskStatus), pRange->minVer, pRange->maxVer); streamSetParamForScanHistoryData(pTask); streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window); @@ -671,8 +670,8 @@ void streamPrepareNdoCheckDownstream(SStreamTask* pTask) { pRange->range.maxVer); } else { SHistDataRange* pRange = &pTask->dataRange; - qDebug("s-task:%s no associated scan-history task, stream time window:%" PRId64 " - %" PRId64 ", ver range:%" PRId64 - " - %" PRId64, + qDebug("s-task:%s no associated scan-history task, stream time window:%" PRId64 " - %" PRId64 + ", ver range:%" PRId64 " - %" PRId64, pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer); } -- GitLab