提交 87ba6cb0 编写于 作者: 5 54liuyao

fix:fill history check update

上级 72680094
...@@ -2044,8 +2044,8 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) ...@@ -2044,8 +2044,8 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
if (len >= size - 1) return dumpBuf; if (len >= size - 1) return dumpBuf;
break; break;
case TSDB_DATA_TYPE_DOUBLE: case TSDB_DATA_TYPE_DOUBLE:
// len += snprintf(dumpBuf + len, size - len, " %15lf |", *(double*)var); len += snprintf(dumpBuf + len, size - len, " %15f |", *(double*)var);
// if (len >= size - 1) return dumpBuf; if (len >= size - 1) return dumpBuf;
break; break;
case TSDB_DATA_TYPE_BOOL: case TSDB_DATA_TYPE_BOOL:
len += snprintf(dumpBuf + len, size - len, " %15d |", *(bool*)var); len += snprintf(dumpBuf + len, size - len, " %15d |", *(bool*)var);
......
...@@ -121,7 +121,8 @@ enum { ...@@ -121,7 +121,8 @@ enum {
STREAM_RECOVER_STEP__NONE = 0, STREAM_RECOVER_STEP__NONE = 0,
STREAM_RECOVER_STEP__PREPARE1, STREAM_RECOVER_STEP__PREPARE1,
STREAM_RECOVER_STEP__PREPARE2, STREAM_RECOVER_STEP__PREPARE2,
STREAM_RECOVER_STEP__SCAN, STREAM_RECOVER_STEP__SCAN1,
STREAM_RECOVER_STEP__SCAN2,
}; };
typedef struct { typedef struct {
......
...@@ -1724,9 +1724,9 @@ static void setBlockGroupIdByUid(SStreamScanInfo* pInfo, SSDataBlock* pBlock) { ...@@ -1724,9 +1724,9 @@ static void setBlockGroupIdByUid(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
} }
} }
static void doCheckUpdate(SStreamScanInfo* pInfo, TSKEY endKey) { static void doCheckUpdate(SStreamScanInfo* pInfo, TSKEY endKey, SSDataBlock* pBlock) {
if (pInfo->pUpdateInfo) { if (pInfo->pUpdateInfo) {
checkUpdateData(pInfo, true, pInfo->pRes, true); checkUpdateData(pInfo, true, pBlock, true);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, endKey); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, endKey);
if (pInfo->pUpdateDataRes->info.rows > 0) { if (pInfo->pUpdateDataRes->info.rows > 0) {
pInfo->updateResIndex = 0; pInfo->updateResIndex = 0;
...@@ -1758,11 +1758,13 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { ...@@ -1758,11 +1758,13 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer1; pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer1;
qDebug("stream recover step 1, from %" PRId64 " to %" PRId64, pTSInfo->base.cond.startVersion, qDebug("stream recover step 1, from %" PRId64 " to %" PRId64, pTSInfo->base.cond.startVersion,
pTSInfo->base.cond.endVersion); pTSInfo->base.cond.endVersion);
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN1;
} else { } else {
pTSInfo->base.cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer1 + 1; pTSInfo->base.cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer1 + 1;
pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer2; pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer2;
qDebug("stream recover step 2, from %" PRId64 " to %" PRId64, pTSInfo->base.cond.startVersion, qDebug("stream recover step 2, from %" PRId64 " to %" PRId64, pTSInfo->base.cond.startVersion,
pTSInfo->base.cond.endVersion); pTSInfo->base.cond.endVersion);
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN2;
} }
/*resetTableScanInfo(pTSInfo, pWin);*/ /*resetTableScanInfo(pTSInfo, pWin);*/
...@@ -1772,11 +1774,11 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { ...@@ -1772,11 +1774,11 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pTSInfo->scanTimes = 0; pTSInfo->scanTimes = 0;
pTSInfo->currentGroupId = -1; pTSInfo->currentGroupId = -1;
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN;
pTaskInfo->streamInfo.recoverScanFinished = false; pTaskInfo->streamInfo.recoverScanFinished = false;
} }
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN) { if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN1 ||
pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN2) {
if (pInfo->blockRecoverContiCnt > 100) { if (pInfo->blockRecoverContiCnt > 100) {
pInfo->blockRecoverTotCnt += pInfo->blockRecoverContiCnt; pInfo->blockRecoverTotCnt += pInfo->blockRecoverContiCnt;
pInfo->blockRecoverContiCnt = 0; pInfo->blockRecoverContiCnt = 0;
...@@ -1789,6 +1791,27 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { ...@@ -1789,6 +1791,27 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
printDataBlock(pInfo->pRecoverRes, "scan recover"); printDataBlock(pInfo->pRecoverRes, "scan recover");
return pInfo->pRecoverRes; return pInfo->pRecoverRes;
} break; } break;
case STREAM_SCAN_FROM_UPDATERES: {
generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
return pInfo->pUpdateRes;
} break;
case STREAM_SCAN_FROM_DATAREADER_RANGE: {
SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
if (pSDB) {
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
uint64_t version = getReaderMaxVersion(pTableScanInfo->base.dataReader);
updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version);
pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
checkUpdateData(pInfo, true, pSDB, false);
// printDataBlock(pSDB, "stream scan update");
calBlockTbName(pInfo, pSDB);
return pSDB;
}
blockDataCleanup(pInfo->pUpdateDataRes);
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
} break;
default: default:
break; break;
} }
...@@ -1798,8 +1821,12 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { ...@@ -1798,8 +1821,12 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pInfo->blockRecoverContiCnt++; pInfo->blockRecoverContiCnt++;
calBlockTbName(pInfo, pInfo->pRecoverRes); calBlockTbName(pInfo, pInfo->pRecoverRes);
if (pInfo->pUpdateInfo) { if (pInfo->pUpdateInfo) {
TSKEY maxTs = updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex); if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN1) {
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); TSKEY maxTs = updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
} else {
doCheckUpdate(pInfo, pInfo->pRecoverRes->info.window.ekey, pInfo->pRecoverRes);
}
} }
if (pInfo->pCreateTbRes->info.rows > 0) { if (pInfo->pCreateTbRes->info.rows > 0) {
pInfo->scanMode = STREAM_SCAN_FROM_RES; pInfo->scanMode = STREAM_SCAN_FROM_RES;
...@@ -1910,7 +1937,7 @@ FETCH_NEXT_BLOCK: ...@@ -1910,7 +1937,7 @@ FETCH_NEXT_BLOCK:
switch (pInfo->scanMode) { switch (pInfo->scanMode) {
case STREAM_SCAN_FROM_RES: { case STREAM_SCAN_FROM_RES: {
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
doCheckUpdate(pInfo, pInfo->pRes->info.window.ekey); doCheckUpdate(pInfo, pInfo->pRes->info.window.ekey, pInfo->pRes);
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
pInfo->pRes->info.dataLoad = 1; pInfo->pRes->info.dataLoad = 1;
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
...@@ -2011,7 +2038,7 @@ FETCH_NEXT_BLOCK: ...@@ -2011,7 +2038,7 @@ FETCH_NEXT_BLOCK:
return pInfo->pCreateTbRes; return pInfo->pCreateTbRes;
} }
doCheckUpdate(pInfo, pBlockInfo->window.ekey); doCheckUpdate(pInfo, pBlockInfo->window.ekey, pInfo->pRes);
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
pInfo->pRes->info.dataLoad = 1; pInfo->pRes->info.dataLoad = 1;
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
......
...@@ -4805,10 +4805,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -4805,10 +4805,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
setInverFunction(pSup->pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.type); setInverFunction(pSup->pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.type);
} }
doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pInfo->pUpdatedMap);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, pBlock->info.window.skey); pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, pBlock->info.window.skey);
doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pInfo->pUpdatedMap);
} }
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
removeDeleteResults(pInfo->pUpdatedMap, pInfo->pDelWins); removeDeleteResults(pInfo->pUpdatedMap, pInfo->pDelWins);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册