diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 453e4a8d8e5c13a7bd503bd491c2b998b4d7d286..3c8d394b4396776b03d3557e49aa7d87c29ffccc 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -2044,8 +2044,8 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) if (len >= size - 1) return dumpBuf; break; case TSDB_DATA_TYPE_DOUBLE: - // len += snprintf(dumpBuf + len, size - len, " %15lf |", *(double*)var); - // if (len >= size - 1) return dumpBuf; + len += snprintf(dumpBuf + len, size - len, " %15f |", *(double*)var); + if (len >= size - 1) return dumpBuf; break; case TSDB_DATA_TYPE_BOOL: len += snprintf(dumpBuf + len, size - len, " %15d |", *(bool*)var); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 0b55eb4a457fbad294478e5b490fb1600903333c..55bf8b37d0272af9bff367b8067b45f841643b4a 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -121,7 +121,8 @@ enum { STREAM_RECOVER_STEP__NONE = 0, STREAM_RECOVER_STEP__PREPARE1, STREAM_RECOVER_STEP__PREPARE2, - STREAM_RECOVER_STEP__SCAN, + STREAM_RECOVER_STEP__SCAN1, + STREAM_RECOVER_STEP__SCAN2, }; typedef struct { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 2f3b75724115fe7a79a62bc6d073d6dd47068c1e..fccb53f0cdbced43e201cc4852d5ab80d2ba0969 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1724,9 +1724,9 @@ static void setBlockGroupIdByUid(SStreamScanInfo* pInfo, SSDataBlock* pBlock) { } } -static void doCheckUpdate(SStreamScanInfo* pInfo, TSKEY endKey) { +static void doCheckUpdate(SStreamScanInfo* pInfo, TSKEY endKey, SSDataBlock* pBlock) { if (pInfo->pUpdateInfo) { - checkUpdateData(pInfo, true, pInfo->pRes, true); + checkUpdateData(pInfo, true, pBlock, true); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, endKey); if (pInfo->pUpdateDataRes->info.rows > 0) { pInfo->updateResIndex = 0; @@ -1758,11 +1758,13 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer1; qDebug("stream recover step 1, from %" PRId64 " to %" PRId64, pTSInfo->base.cond.startVersion, pTSInfo->base.cond.endVersion); + pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN1; } else { pTSInfo->base.cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer1 + 1; pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer2; qDebug("stream recover step 2, from %" PRId64 " to %" PRId64, pTSInfo->base.cond.startVersion, pTSInfo->base.cond.endVersion); + pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN2; } /*resetTableScanInfo(pTSInfo, pWin);*/ @@ -1772,11 +1774,11 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { pTSInfo->scanTimes = 0; pTSInfo->currentGroupId = -1; - pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN; pTaskInfo->streamInfo.recoverScanFinished = false; } - if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN) { + if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN1 || + pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN2) { if (pInfo->blockRecoverContiCnt > 100) { pInfo->blockRecoverTotCnt += pInfo->blockRecoverContiCnt; pInfo->blockRecoverContiCnt = 0; @@ -1789,6 +1791,27 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { printDataBlock(pInfo->pRecoverRes, "scan recover"); return pInfo->pRecoverRes; } break; + case STREAM_SCAN_FROM_UPDATERES: { + generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes); + prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); + pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; + return pInfo->pUpdateRes; + } break; + case STREAM_SCAN_FROM_DATAREADER_RANGE: { + SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex); + if (pSDB) { + STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; + uint64_t version = getReaderMaxVersion(pTableScanInfo->base.dataReader); + updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version); + pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA; + checkUpdateData(pInfo, true, pSDB, false); + // printDataBlock(pSDB, "stream scan update"); + calBlockTbName(pInfo, pSDB); + return pSDB; + } + blockDataCleanup(pInfo->pUpdateDataRes); + pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; + } break; default: break; } @@ -1798,8 +1821,12 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { pInfo->blockRecoverContiCnt++; calBlockTbName(pInfo, pInfo->pRecoverRes); if (pInfo->pUpdateInfo) { - TSKEY maxTs = updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex); - pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); + if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN1) { + TSKEY maxTs = updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex); + pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); + } else { + doCheckUpdate(pInfo, pInfo->pRecoverRes->info.window.ekey, pInfo->pRecoverRes); + } } if (pInfo->pCreateTbRes->info.rows > 0) { pInfo->scanMode = STREAM_SCAN_FROM_RES; @@ -1910,7 +1937,7 @@ FETCH_NEXT_BLOCK: switch (pInfo->scanMode) { case STREAM_SCAN_FROM_RES: { pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; - doCheckUpdate(pInfo, pInfo->pRes->info.window.ekey); + doCheckUpdate(pInfo, pInfo->pRes->info.window.ekey, pInfo->pRes); doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); pInfo->pRes->info.dataLoad = 1; blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); @@ -2011,7 +2038,7 @@ FETCH_NEXT_BLOCK: return pInfo->pCreateTbRes; } - doCheckUpdate(pInfo, pBlockInfo->window.ekey); + doCheckUpdate(pInfo, pBlockInfo->window.ekey, pInfo->pRes); doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); pInfo->pRes->info.dataLoad = 1; blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 6411d862ae20ba89b3de24a7c400d934a7a69180..bfdf5ea89b197d6e59f124d627dd3b857c380b99 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -4805,10 +4805,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { setInverFunction(pSup->pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.type); } + doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pInfo->pUpdatedMap); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey); pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, pBlock->info.window.skey); - - doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pInfo->pUpdatedMap); } pOperator->status = OP_RES_TO_RETURN; removeDeleteResults(pInfo->pUpdatedMap, pInfo->pDelWins);