diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index dce358ab6de1db6cc87febf527faf67ed368e9a7..953d61495141e9b50d29313c030e7b74fc63a482 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -2042,7 +2042,7 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags void printDataBlock(SSDataBlock* pBlock, const char* flag) { if (!pBlock || pBlock->info.rows == 0) { - qDebug("===stream===printDataBlock: Block is Null or Empty"); + qDebug("===stream===%s: Block is Null or Empty", flag); return; } char* pBuf = NULL; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index ce647014aee772bed9fc19b01ad3f438d4857743..5dff1abb9799f43eddd14847cbca26bc1936cc78 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1803,6 +1803,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes); prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; + printDataBlock(pInfo->pUpdateRes, "recover update"); return pInfo->pUpdateRes; } break; case STREAM_SCAN_FROM_DATAREADER_RANGE: { @@ -1813,7 +1814,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version); pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA; checkUpdateData(pInfo, true, pSDB, false); - // printDataBlock(pSDB, "stream scan update"); + printDataBlock(pSDB, "scan recover update"); calBlockTbName(pInfo, pSDB); return pSDB; } @@ -1838,6 +1839,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { } if (pInfo->pCreateTbRes->info.rows > 0) { pInfo->scanMode = STREAM_SCAN_FROM_RES; + printDataBlock(pInfo->pCreateTbRes, "recover createTbl"); return pInfo->pCreateTbRes; } qDebug("stream recover scan get block, rows %d", pInfo->pRecoverRes->info.rows); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index cb610ad6b536fb4b3fb9ec6a97911a829a3f0b6b..25b265636586bf74e6cb836b89308f6cd20d4122 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -152,8 +152,14 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { if (batchCnt >= batchSz) break; } if (taosArrayGetSize(pRes) == 0) { - taosArrayDestroy(pRes); - break; + if (finished) { + taosArrayDestroy(pRes); + qDebug("task %d finish recover exec task ", pTask->taskId); + break; + } else { + qDebug("task %d continue recover exec task ", pTask->taskId); + continue; + } } SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); if (qRes == NULL) {