提交 21854648 编写于 作者: 5 54liuyao

fix:scan of fill history ended prematurely

上级 cd498627
...@@ -2041,7 +2041,7 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags ...@@ -2041,7 +2041,7 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
void printDataBlock(SSDataBlock* pBlock, const char* flag) { void printDataBlock(SSDataBlock* pBlock, const char* flag) {
if (!pBlock || pBlock->info.rows == 0) { if (!pBlock || pBlock->info.rows == 0) {
qDebug("===stream===printDataBlock: Block is Null or Empty"); qDebug("===stream===%s: Block is Null or Empty", flag);
return; return;
} }
char* pBuf = NULL; char* pBuf = NULL;
......
...@@ -1798,6 +1798,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { ...@@ -1798,6 +1798,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes); generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
printDataBlock(pInfo->pUpdateRes, "recover update");
return pInfo->pUpdateRes; return pInfo->pUpdateRes;
} break; } break;
case STREAM_SCAN_FROM_DATAREADER_RANGE: { case STREAM_SCAN_FROM_DATAREADER_RANGE: {
...@@ -1808,7 +1809,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { ...@@ -1808,7 +1809,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version); updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version);
pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA; pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
checkUpdateData(pInfo, true, pSDB, false); checkUpdateData(pInfo, true, pSDB, false);
// printDataBlock(pSDB, "stream scan update"); printDataBlock(pSDB, "scan recover update");
calBlockTbName(pInfo, pSDB); calBlockTbName(pInfo, pSDB);
return pSDB; return pSDB;
} }
...@@ -1833,6 +1834,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { ...@@ -1833,6 +1834,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
} }
if (pInfo->pCreateTbRes->info.rows > 0) { if (pInfo->pCreateTbRes->info.rows > 0) {
pInfo->scanMode = STREAM_SCAN_FROM_RES; pInfo->scanMode = STREAM_SCAN_FROM_RES;
printDataBlock(pInfo->pCreateTbRes, "recover createTbl");
return pInfo->pCreateTbRes; return pInfo->pCreateTbRes;
} }
qDebug("stream recover scan get block, rows %d", pInfo->pRecoverRes->info.rows); qDebug("stream recover scan get block, rows %d", pInfo->pRecoverRes->info.rows);
......
...@@ -152,8 +152,14 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { ...@@ -152,8 +152,14 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
if (batchCnt >= batchSz) break; if (batchCnt >= batchSz) break;
} }
if (taosArrayGetSize(pRes) == 0) { if (taosArrayGetSize(pRes) == 0) {
taosArrayDestroy(pRes); if (finished) {
break; taosArrayDestroy(pRes);
qDebug("task %d finish recover exec task ", pTask->taskId);
break;
} else {
qDebug("task %d continue recover exec task ", pTask->taskId);
continue;
}
} }
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
if (qRes == NULL) { if (qRes == NULL) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册