diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index cee3438280ada63a6759902eae840b933d02b1d3..e421a5d671094f1309eb5391062d9e2870f1004b 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -204,7 +204,6 @@ int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask, } int32_t streamProcessCheckpointReq(SStreamTask* pTask, SStreamCheckpointReq* pReq) { - int32_t code; int64_t checkpointId = pReq->checkpointId; int32_t childId = pReq->childId; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index b3e5dc247556517426f30e3b0f6480f04330d216..5f85abb6b668c0936c006df570b090cd37287812 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -301,11 +301,11 @@ int32_t updateCheckPointInfo(SStreamTask* pTask, int64_t checkpointId) { qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId); SCheckpointInfo* pCkInfo = &pTask->chkInfo; - qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64 ", checkpointId:%" PRId64 - " -> %" PRId64, - pTask->id.idStr, pCkInfo->version, dataVer, pCkInfo->keptCheckpointId, checkpointId); +// qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64 ", checkpointId:%" PRId64 +// " -> %" PRId64, +// pTask->id.idStr, pCkInfo->version, dataVer, pCkInfo->keptCheckpointId, checkpointId); pCkInfo->keptCheckpointId = checkpointId; - pCkInfo->version = dataVer; +// pCkInfo->version = dataVer; return TSDB_CODE_SUCCESS; } @@ -418,6 +418,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { int64_t st = taosGetTimestampMs(); qDebug("s-task:%s start to process batch of blocks, num:%d", id, batchSize); + int64_t currentVer = pTask->chkInfo.currentVer; { // set input @@ -433,6 +434,8 @@ int32_t streamExecForAll(SStreamTask* pTask) { qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT); qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, id, pSubmit, pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver); + ASSERT(currentVer < pSubmit->submit.ver); + currentVer = pSubmit->submit.ver; } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) { const SStreamDataBlock* pBlock = (const SStreamDataBlock*)pInput; @@ -445,8 +448,11 @@ int32_t streamExecForAll(SStreamTask* pTask) { SArray* pBlockList = pMerged->submits; int32_t numOfBlocks = taosArrayGetSize(pBlockList); - qDebug("s-task:%s %p set (merged) submit blocks as a batch, numOfBlocks:%d", id, pTask, numOfBlocks); + qDebug("s-task:%s %p set (merged) submit blocks as a batch, numOfBlocks:%d, ver:%" PRId64, id, pTask, + numOfBlocks, pMerged->ver); qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT); + ASSERT(currentVer < pMerged->ver); + currentVer = pMerged->ver; } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) { const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput; qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK); @@ -466,6 +472,13 @@ int32_t streamExecForAll(SStreamTask* pTask) { qDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, el, resSize / 1048576.0, totalBlocks); + // update the currentVer if processing the submit blocks. + if(currentVer > pTask->chkInfo.currentVer) { + qDebug("s-task:%s update currentVer from %" PRId64 " to %" PRId64, pTask->id.idStr, + pTask->chkInfo.currentVer, currentVer); + pTask->chkInfo.currentVer = currentVer; + } + int32_t type = pInput->type; streamFreeQitem(pInput); @@ -524,15 +537,16 @@ int32_t streamTryExec(SStreamTask* pTask) { if (remain == 0) { // all tasks are in TASK_STATUS__CK_READY state streamBackendDoCheckpoint(pMeta, pTask->checkpointingId); - qDebug("vgId:%d do vnode wide checkpoint completed, checkpoint id:%"PRId64, pMeta->vgId); + qDebug("vgId:%d do vnode wide checkpoint completed, checkpointId:%"PRId64, pMeta->vgId); } if (pTask->info.taskLevel != TASK_LEVEL__SINK) { - code = updateCheckPointInfo(pTask, pTask->checkpointingId); - if (code != TSDB_CODE_SUCCESS) { - return code; - } +// code = updateCheckPointInfo(pTask, pTask->checkpointingId); +// if (code != TSDB_CODE_SUCCESS) { +// return code; +// } } + // send check point response to upstream task if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { code = streamTaskSendCheckpointSourceRsp(pTask);