提交 7c9898cf 编写于 作者: L liuyao

adj stream scan

上级 c54bd55c
......@@ -1902,35 +1902,35 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
printDataBlock(pInfo->pRecoverRes, "scan recover");
return pInfo->pRecoverRes;
} break;
case STREAM_SCAN_FROM_UPDATERES: {
generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
printDataBlock(pInfo->pUpdateRes, "recover update");
return pInfo->pUpdateRes;
} break;
case STREAM_SCAN_FROM_DELETE_DATA: {
generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
printDataBlock(pInfo->pDeleteDataRes, "recover delete");
return pInfo->pDeleteDataRes;
} break;
case STREAM_SCAN_FROM_DATAREADER_RANGE: {
SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
if (pSDB) {
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
checkUpdateData(pInfo, true, pSDB, false);
printDataBlock(pSDB, "scan recover update");
calBlockTbName(pInfo, pSDB);
return pSDB;
}
blockDataCleanup(pInfo->pUpdateDataRes);
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
} break;
// case STREAM_SCAN_FROM_UPDATERES: {
// generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
// prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
// pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
// printDataBlock(pInfo->pUpdateRes, "recover update");
// return pInfo->pUpdateRes;
// } break;
// case STREAM_SCAN_FROM_DELETE_DATA: {
// generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
// prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
// pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
// copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
// pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
// printDataBlock(pInfo->pDeleteDataRes, "recover delete");
// return pInfo->pDeleteDataRes;
// } break;
// case STREAM_SCAN_FROM_DATAREADER_RANGE: {
// SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
// if (pSDB) {
// STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
// pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
// checkUpdateData(pInfo, true, pSDB, false);
// printDataBlock(pSDB, "scan recover update");
// calBlockTbName(pInfo, pSDB);
// return pSDB;
// }
// blockDataCleanup(pInfo->pUpdateDataRes);
// pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
// } break;
default:
break;
}
......@@ -1939,13 +1939,13 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
if (pInfo->pRecoverRes != NULL) {
calBlockTbName(pInfo, pInfo->pRecoverRes);
if (!pInfo->igCheckUpdate && pInfo->pUpdateInfo) {
if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1) {
TSKEY maxTs = pAPI->stateStore.updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
} else {
pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pStreamInfo->fillHistoryVer.maxVer);
doCheckUpdate(pInfo, pInfo->pRecoverRes->info.window.ekey, pInfo->pRecoverRes);
}
// if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1) {
TSKEY maxTs = pAPI->stateStore.updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
// } else {
// pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pStreamInfo->fillHistoryVer.maxVer);
// doCheckUpdate(pInfo, pInfo->pRecoverRes->info.window.ekey, pInfo->pRecoverRes);
// }
}
if (pInfo->pCreateTbRes->info.rows > 0) {
pInfo->scanMode = STREAM_SCAN_FROM_RES;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册