diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a935eaf5f737d22be1ca792955c5ba44398411c3..3bcc141edc4ba837f2bb88cefdca665423516615 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1322,13 +1322,14 @@ int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg) { return -1; } - tqDebug("s-task:%s scan-history finish rsp received from downstream task:0x%x", pTask->id.idStr, req.downstreamId); - int32_t remain = atomic_sub_fetch_32(&pTask->notReadyTasks, 1); if (remain > 0) { - tqDebug("s-task:%s remain:%d not send finish rsp", pTask->id.idStr, remain); + tqDebug("s-task:%s scan-history finish rsp received from downstream task:0x%x, remain:%d not send finish rsp", + pTask->id.idStr, req.downstreamId, remain); } else { - tqDebug("s-task:%s all downstream tasks rsp scan-history completed msg", pTask->id.idStr); + tqDebug( + "s-task:%s scan-history finish rsp received from downstream task:0x%x, all downstream tasks rsp scan-history " + "completed msg", pTask->id.idStr, req.downstreamId); streamProcessScanHistoryFinishRsp(pTask); } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 7743c78fbccdf729feae31c4a52c55036e1f741c..d5d8ba130c632b0b50caa607ffcd7084b37cde28 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -2569,9 +2569,18 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { // load the last data block of current table STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter; + if (pScanInfo == NULL) { + tsdbError("table Iter is null, invalid pScanInfo, try next table %s", pReader->idStr); + bool hasNexTable = moveToNextTable(pUidList, pStatus); + if (!hasNexTable) { + return TSDB_CODE_SUCCESS; + } + + continue; + } + if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pScanInfo->uid, sizeof(pScanInfo->uid))) { // reset the index in last block when handing a new file - // doCleanupTableScanInfo(pScanInfo); bool hasNexTable = moveToNextTable(pUidList, pStatus); if (!hasNexTable) { return TSDB_CODE_SUCCESS; @@ -2580,9 +2589,6 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { continue; } - // reset the index in last block when handing a new file - // doCleanupTableScanInfo(pScanInfo); - bool hasDataInLastFile = initLastBlockReader(pLastBlockReader, pScanInfo, pReader); if (!hasDataInLastFile) { bool hasNexTable = moveToNextTable(pUidList, pStatus); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 6c8d9ed59f57320b01474ae0c05af978089217ec..a15b128a99766fd1984b27daa832ca619a861f74 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2155,15 +2155,14 @@ FETCH_NEXT_BLOCK: return pInfo->pCreateTbRes; } - doCheckUpdate(pInfo, pBlockInfo->window.ekey, pBlock); - doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL); - // apply additional time window filter doBlockDataWindowFilter(pBlock, pInfo->primaryTsIndex, &pStreamInfo->fillHistoryWindow, id); - pBlock->info.dataLoad = 1; blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex); + doCheckUpdate(pInfo, pBlockInfo->window.ekey, pBlock); + doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL); + int64_t numOfUpdateRes = pInfo->pUpdateDataRes->info.rows; qDebug("%s %" PRId64 " rows in datablock, update res:%" PRId64, id, pBlockInfo->rows, numOfUpdateRes); if (pBlockInfo->rows > 0 || numOfUpdateRes > 0) {