提交 ff2bf356 编写于 作者: H Haojun Liao

fix(stream): fix error in set the version range for secondary scan.

上级 86b1e495
......@@ -1061,7 +1061,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
int32_t code = TSDB_CODE_SUCCESS;
char* msg = pMsg->pCont;
SStreamMeta* pMeta = pTq->pStreamMeta;
SStreamMeta* pMeta = pTq->pStreamMeta;
SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)msg;
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId);
......@@ -1107,7 +1107,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
if (pTask->info.fillHistory) {
SVersionRange* pRange = NULL;
SStreamTask* pStreamTask = NULL;
SStreamTask* pStreamTask = NULL;
if (!pReq->igUntreated && !streamTaskRecoverScanStep1Finished(pTask)) {
// 1. stop the related stream task, get the current scan wal version of stream task, ver.
......@@ -1120,9 +1120,10 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
// wait for the stream task get ready for scan history data
while (((pStreamTask->status.downstreamReady == 0) && (pStreamTask->status.taskStatus != TASK_STATUS__STOP)) ||
pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
tqDebug("s-task:%s level:%d related stream task:%s not ready for halt, wait for it continue and recheck in 100ms",
pTask->id.idStr, pTask->info.taskLevel, pStreamTask->id.idStr);
pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
tqDebug(
"s-task:%s level:%d related stream task:%s not ready for halt, wait for it continue and recheck in 100ms",
pTask->id.idStr, pTask->info.taskLevel, pStreamTask->id.idStr);
taosMsleep(100);
}
......@@ -1132,36 +1133,34 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
pStreamTask->info.taskLevel, pId);
// if it's an source task, extract the last version in wal.
int64_t ver = pTask->dataRange.range.maxVer + 1;
int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader);
if (latestVer >= ver) {
ver = latestVer;
}
// 2. do secondary scan of the history data, the time window remain, and the version range is updated to [pTask->dataRange.range.maxVer, ver1]
pRange = &pTask->dataRange.range;
int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader);
ASSERT(latestVer >= pRange->maxVer);
pRange->minVer = pRange->maxVer + 1;
pRange->maxVer = ver;
if (pRange->minVer == pRange->maxVer) {
int64_t nextStartVer = pRange->maxVer + 1;
if (nextStartVer > latestVer - 1) {
// no input data yet. no need to execute the secondardy scan while stream task halt
streamTaskRecoverSetAllStepFinished(pTask);
tqDebug("s-task:%s no need to perform secondary scan-history-data(step 2), since no new data ingest", pId);
tqDebug("s-task:%s no need to perform secondary scan-history-data(step 2), since no data ingest during secondary scan", pId);
} else {
// 2. do secondary scan of the history data, the time window remain, and the version range is updated to
// [pTask->dataRange.range.maxVer, ver1]
pRange->minVer = nextStartVer;
pRange->maxVer = latestVer - 1;
}
}
if (!streamTaskRecoverScanStep1Finished(pTask)) {
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64
" do secondary scan-history-data after halt the related stream task:%s",
pId, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pStreamTask->id.idStr);
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
st = taosGetTimestampMs();
streamSetParamForStreamScannerStep2(pTask, pRange, &pTask->dataRange.window);
}
if(!streamTaskRecoverScanStep2Finished(pTask)) {
if (!streamTaskRecoverScanStep2Finished(pTask)) {
streamSourceScanHistoryData(pTask);
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) {
tqDebug("s-task:%s is dropped or paused, abort recover in step1", pId);
......@@ -1215,7 +1214,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
pWindow->ekey);
} else {
tqDebug("s-task:%s history data scan completed, now start to scan data from wal, start ver:%" PRId64
", window:%" PRId64 " - %" PRId64, pId, pTask->chkInfo.currentVer, pWindow->skey, pWindow->ekey);
", window:%" PRId64 " - %" PRId64,
pId, pTask->chkInfo.currentVer, pWindow->skey, pWindow->ekey);
}
code = streamTaskScanHistoryDataComplete(pTask);
......
......@@ -248,7 +248,7 @@ static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdb
STbData* piMemTbData);
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr,
int8_t* pLevel);
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, const char* id);
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
static int32_t doBuildDataBlock(STsdbReader* pReader);
static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
......@@ -775,7 +775,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void
pReader->order = pCond->order;
pReader->idStr = (idstr != NULL) ? taosStrdup(idstr) : NULL;
pReader->verRange = getQueryVerRange(pVnode, pCond, level);
pReader->verRange = getQueryVerRange(pVnode, pCond, idstr);
pReader->type = pCond->type;
pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
pReader->blockInfoBuf.numPerBucket = 1000; // 1000 tables per bucket
......@@ -3721,7 +3721,7 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* ret
return VND_TSDB(pVnode);
}
SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level) {
SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, const char* id) {
int64_t startVer = (pCond->startVersion == -1) ? 0 : pCond->startVersion;
int64_t endVer = 0;
......@@ -3732,6 +3732,9 @@ SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_
endVer = (pCond->endVersion > pVnode->state.applied) ? pVnode->state.applied : pCond->endVersion;
}
tsdbDebug("queried verRange:%"PRId64"-%"PRId64", revised query verRange:%"PRId64"-%"PRId64", %s", pCond->startVersion,
pCond->endVersion, startVer, endVer, id);
return (SVersionRange){.minVer = startVer, .maxVer = endVer};
}
......
......@@ -892,7 +892,7 @@ int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRan
pStreamInfo->recoverStep1Finished = false;
pStreamInfo->recoverStep2Finished = false;
qDebug("%s step 1. set param for stream scanner for scan history data, Ver:%" PRId64 " - %" PRId64 ", window:%" PRId64
qDebug("%s step 1. set param for stream scanner for scan history data, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64
" - %" PRId64,
GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey,
pWindow->ekey);
......@@ -911,7 +911,7 @@ int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRan
pStreamInfo->recoverStep1Finished = true;
pStreamInfo->recoverStep2Finished = false;
qDebug("%s step 2. set param for stream scanner for scan history data, Ver:%" PRId64 " - %" PRId64 ", window:%" PRId64
qDebug("%s step 2. set param for stream scanner for scan history data, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64
" - %" PRId64,
GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey,
pWindow->ekey);
......
......@@ -1783,28 +1783,28 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
SStreamScanInfo* pInfo = pOperator->info;
SStreamTaskInfo* pStreamInfo = &pTaskInfo->streamInfo;
qDebug("stream scan started, %s", id);
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1 ||
pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE2) {
if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__PREPARE1 || pStreamInfo->recoverStep == STREAM_RECOVER_STEP__PREPARE2) {
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
memcpy(&pTSInfo->base.cond, &pTaskInfo->streamInfo.tableCond, sizeof(SQueryTableDataCond));
memcpy(&pTSInfo->base.cond, &pStreamInfo->tableCond, sizeof(SQueryTableDataCond));
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1) {
pTSInfo->base.cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer.minVer;
pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer.maxVer;
if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__PREPARE1) {
pTSInfo->base.cond.startVersion = pStreamInfo->fillHistoryVer.minVer;
pTSInfo->base.cond.endVersion = pStreamInfo->fillHistoryVer.maxVer;
pTSInfo->base.cond.twindows = pTaskInfo->streamInfo.fillHistoryWindow;
pTSInfo->base.cond.twindows = pStreamInfo->fillHistoryWindow;
qDebug("stream recover step1, verRange:%" PRId64 "-%" PRId64 " window:%"PRId64"-%"PRId64", %s", pTSInfo->base.cond.startVersion,
pTSInfo->base.cond.endVersion, pTSInfo->base.cond.twindows.skey, pTSInfo->base.cond.twindows.ekey, id);
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN1;
pStreamInfo->recoverStep = STREAM_RECOVER_STEP__SCAN1;
} else {
pTSInfo->base.cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer.minVer;
pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer.maxVer;
pTSInfo->base.cond.startVersion = pStreamInfo->fillHistoryVer.minVer;
pTSInfo->base.cond.endVersion = pStreamInfo->fillHistoryVer.maxVer;
qDebug("stream recover step2, verRange:%" PRId64 " - %" PRId64", %s", pTSInfo->base.cond.startVersion,
pTSInfo->base.cond.endVersion, id);
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN2;
pStreamInfo->recoverStep = STREAM_RECOVER_STEP__SCAN2;
}
pAPI->tsdReader.tsdReaderClose(pTSInfo->base.dataReader);
......@@ -1814,11 +1814,11 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pTSInfo->scanTimes = 0;
pTSInfo->currentGroupId = -1;
pTaskInfo->streamInfo.recoverScanFinished = false;
pStreamInfo->recoverScanFinished = false;
}
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN1 ||
pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN2) {
if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1 ||
pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN2) {
if (pInfo->blockRecoverContiCnt > 100) {
pInfo->blockRecoverTotCnt += pInfo->blockRecoverContiCnt;
pInfo->blockRecoverContiCnt = 0;
......@@ -1869,11 +1869,11 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pInfo->blockRecoverContiCnt++;
calBlockTbName(pInfo, pInfo->pRecoverRes);
if (!pInfo->igCheckUpdate && pInfo->pUpdateInfo) {
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN1) {
if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1) {
TSKEY maxTs = pAPI->stateStore.updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
} else {
pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pTaskInfo->streamInfo.fillHistoryVer.maxVer);
pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pStreamInfo->fillHistoryVer.maxVer);
doCheckUpdate(pInfo, pInfo->pRecoverRes->info.window.ekey, pInfo->pRecoverRes);
}
}
......@@ -1887,7 +1887,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
printDataBlock(pInfo->pRecoverRes, "scan recover");
return pInfo->pRecoverRes;
}
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE;
pStreamInfo->recoverStep = STREAM_RECOVER_STEP__NONE;
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
pAPI->tsdReader.tsdReaderClose(pTSInfo->base.dataReader);
......@@ -1896,7 +1896,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pTSInfo->base.cond.startVersion = -1;
pTSInfo->base.cond.endVersion = -1;
pTaskInfo->streamInfo.recoverScanFinished = true;
pStreamInfo->recoverScanFinished = true;
return NULL;
}
......@@ -1915,7 +1915,7 @@ FETCH_NEXT_BLOCK:
SPackedData* pPacked = taosArrayGet(pInfo->pBlockLists, current);
SSDataBlock* pBlock = pPacked->pDataBlock;
if (pBlock->info.parTbName[0]) {
pAPI->stateStore.streamStatePutParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, pBlock->info.parTbName);
pAPI->stateStore.streamStatePutParName(pStreamInfo->pState, pBlock->info.id.groupId, pBlock->info.parTbName);
}
// TODO move into scan
......@@ -2097,7 +2097,7 @@ FETCH_NEXT_BLOCK:
doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
{ // do additional time window filter
STimeWindow* pWindow = &pTaskInfo->streamInfo.fillHistoryWindow;
STimeWindow* pWindow = &pStreamInfo->fillHistoryWindow;
if (pWindow->skey != INT64_MIN) {
qDebug("%s filter for additional history window, skey:%"PRId64, id, pWindow->skey);
......
......@@ -50,9 +50,8 @@ const char* streamGetTaskStatusStr(int32_t status) {
static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) {
SVersionRange* pRange = &pTask->dataRange.range;
qDebug("s-task:%s vgId:%d task status:%s and start to scan-history-data task, ver:%" PRId64 " - %" PRId64,
pTask->id.idStr, pTask->info.nodeId, streamGetTaskStatusStr(pTask->status.taskStatus),
pRange->minVer, pRange->maxVer);
qDebug("s-task:%s vgId:%d status:%s, start scan-history-data task, verRange:%" PRId64 " - %" PRId64, pTask->id.idStr,
pTask->info.nodeId, streamGetTaskStatusStr(pTask->status.taskStatus), pRange->minVer, pRange->maxVer);
streamSetParamForScanHistoryData(pTask);
streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window);
......@@ -671,8 +670,8 @@ void streamPrepareNdoCheckDownstream(SStreamTask* pTask) {
pRange->range.maxVer);
} else {
SHistDataRange* pRange = &pTask->dataRange;
qDebug("s-task:%s no associated scan-history task, stream time window:%" PRId64 " - %" PRId64 ", ver range:%" PRId64
" - %" PRId64,
qDebug("s-task:%s no associated scan-history task, stream time window:%" PRId64 " - %" PRId64
", ver range:%" PRId64 " - %" PRId64,
pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册