diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b1c93104fad0b14975e308184c6e3afc0e09ae85..fcd54767711939e8abced12b5cddc6c36b23c94a 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2120,8 +2120,7 @@ FETCH_NEXT_BLOCK: return pInfo->pUpdateRes; } - SSDataBlock* pBlock = pInfo->pRes; - SDataBlockInfo* pBlockInfo = &pBlock->info; + SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; int32_t totalBlocks = taosArrayGetSize(pInfo->pBlockLists); NEXT_SUBMIT_BLK: @@ -2145,35 +2144,36 @@ FETCH_NEXT_BLOCK: } } - blockDataCleanup(pBlock); + blockDataCleanup(pInfo->pRes); while (pAPI->tqReaderFn.tqNextBlockImpl(pInfo->tqReader, id)) { SSDataBlock* pRes = NULL; int32_t code = pAPI->tqReaderFn.tqRetrieveBlock(pInfo->tqReader, &pRes, id); - qDebug("retrieve data from submit completed code:%s, rows:%" PRId64 " %s", tstrerror(code), pRes->info.rows, - id); + qDebug("retrieve data from submit completed code:%s rows:%" PRId64 " %s", tstrerror(code), pRes->info.rows, id); if (code != TSDB_CODE_SUCCESS || pRes->info.rows == 0) { qDebug("retrieve data failed, try next block in submit block, %s", id); continue; } - setBlockIntoRes(pInfo, pRes, false); + // filter the block extracted from WAL files, according to the time window + // apply additional time window filter + doBlockDataWindowFilter(pRes, pInfo->primaryTsIndex, &pStreamInfo->fillHistoryWindow, id); + blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); + if (pRes->info.rows == 0) { + continue; + } + setBlockIntoRes(pInfo, pRes, false); if (pInfo->pCreateTbRes->info.rows > 0) { pInfo->scanMode = STREAM_SCAN_FROM_RES; qDebug("create table res exists, rows:%"PRId64" return from stream scan, %s", pInfo->pCreateTbRes->info.rows, id); return pInfo->pCreateTbRes; } - // apply additional time window filter - doBlockDataWindowFilter(pBlock, pInfo->primaryTsIndex, &pStreamInfo->fillHistoryWindow, id); - pBlock->info.dataLoad = 1; - blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex); - - doCheckUpdate(pInfo, pBlockInfo->window.ekey, pBlock); - doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL); + doCheckUpdate(pInfo, pBlockInfo->window.ekey, pInfo->pRes); + doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); int64_t numOfUpdateRes = pInfo->pUpdateDataRes->info.rows; qDebug("%s %" PRId64 " rows in datablock, update res:%" PRId64, id, pBlockInfo->rows, numOfUpdateRes); @@ -2195,7 +2195,7 @@ FETCH_NEXT_BLOCK: qDebug("stream scan completed, and return source rows:%" PRId64", %s", pBlockInfo->rows, id); if (pBlockInfo->rows > 0) { - return pBlock; + return pInfo->pRes; } if (pInfo->pUpdateDataRes->info.rows > 0) { diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index f9bbe96d9775b5734d92a2d9e878d6b539daa2a5..223e1858006c78defcb534c1137e8ea128cee27f 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -80,6 +80,7 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) { walReaderGetCurrentVer(pTask->exec.pWalReader)); } } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) { + streamTaskEnablePause(pTask); streamSetParamForScanHistory(pTask); streamTaskScanHistoryPrepare(pTask); } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {