提交 cbda61c7 编写于 作者: H Haojun Liao

fix(stream): fix error in checkpointing.

上级 184b2d64
......@@ -204,7 +204,6 @@ int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask,
}
int32_t streamProcessCheckpointReq(SStreamTask* pTask, SStreamCheckpointReq* pReq) {
int32_t code;
int64_t checkpointId = pReq->checkpointId;
int32_t childId = pReq->childId;
......
......@@ -301,11 +301,11 @@ int32_t updateCheckPointInfo(SStreamTask* pTask, int64_t checkpointId) {
qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId);
SCheckpointInfo* pCkInfo = &pTask->chkInfo;
qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64 ", checkpointId:%" PRId64
" -> %" PRId64,
pTask->id.idStr, pCkInfo->version, dataVer, pCkInfo->keptCheckpointId, checkpointId);
// qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64 ", checkpointId:%" PRId64
// " -> %" PRId64,
// pTask->id.idStr, pCkInfo->version, dataVer, pCkInfo->keptCheckpointId, checkpointId);
pCkInfo->keptCheckpointId = checkpointId;
pCkInfo->version = dataVer;
// pCkInfo->version = dataVer;
return TSDB_CODE_SUCCESS;
}
......@@ -418,6 +418,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
int64_t st = taosGetTimestampMs();
qDebug("s-task:%s start to process batch of blocks, num:%d", id, batchSize);
int64_t currentVer = pTask->chkInfo.currentVer;
{
// set input
......@@ -433,6 +434,8 @@ int32_t streamExecForAll(SStreamTask* pTask) {
qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, id, pSubmit,
pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver);
ASSERT(currentVer < pSubmit->submit.ver);
currentVer = pSubmit->submit.ver;
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
const SStreamDataBlock* pBlock = (const SStreamDataBlock*)pInput;
......@@ -445,8 +448,11 @@ int32_t streamExecForAll(SStreamTask* pTask) {
SArray* pBlockList = pMerged->submits;
int32_t numOfBlocks = taosArrayGetSize(pBlockList);
qDebug("s-task:%s %p set (merged) submit blocks as a batch, numOfBlocks:%d", id, pTask, numOfBlocks);
qDebug("s-task:%s %p set (merged) submit blocks as a batch, numOfBlocks:%d, ver:%" PRId64, id, pTask,
numOfBlocks, pMerged->ver);
qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT);
ASSERT(currentVer < pMerged->ver);
currentVer = pMerged->ver;
} else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput;
qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
......@@ -466,6 +472,13 @@ int32_t streamExecForAll(SStreamTask* pTask) {
qDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d",
id, el, resSize / 1048576.0, totalBlocks);
// update the currentVer if processing the submit blocks.
if(currentVer > pTask->chkInfo.currentVer) {
qDebug("s-task:%s update currentVer from %" PRId64 " to %" PRId64, pTask->id.idStr,
pTask->chkInfo.currentVer, currentVer);
pTask->chkInfo.currentVer = currentVer;
}
int32_t type = pInput->type;
streamFreeQitem(pInput);
......@@ -524,15 +537,16 @@ int32_t streamTryExec(SStreamTask* pTask) {
if (remain == 0) { // all tasks are in TASK_STATUS__CK_READY state
streamBackendDoCheckpoint(pMeta, pTask->checkpointingId);
qDebug("vgId:%d do vnode wide checkpoint completed, checkpoint id:%"PRId64, pMeta->vgId);
qDebug("vgId:%d do vnode wide checkpoint completed, checkpointId:%"PRId64, pMeta->vgId);
}
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
code = updateCheckPointInfo(pTask, pTask->checkpointingId);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// code = updateCheckPointInfo(pTask, pTask->checkpointingId);
// if (code != TSDB_CODE_SUCCESS) {
// return code;
// }
}
// send check point response to upstream task
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
code = streamTaskSendCheckpointSourceRsp(pTask);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册