diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 192bebe95aa05b59c8ef27d444648be429ebb8a8..1e9f0ddf8a003b157babd9c240898db5a377326e 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -41,13 +41,12 @@ typedef struct { } SLocalFetch; typedef struct { - void* tqReader; - void* config; + void* tqReader; // todo remove it void* vnode; void* mnd; SMsgCb* pMsgCb; int64_t version; - bool initMetaReader; + uint64_t checkpointId; bool initTableReader; bool initTqReader; int32_t numOfVgroups; diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index c1a4afa5661fff29c0e290219f121c2c551ea57d..c584f6e8233fe25bfece4627fde9b3e8d6b2cd99 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -258,9 +258,9 @@ typedef struct SStreamId { } SStreamId; typedef struct SCheckpointInfo { - int64_t keptCheckpointId; - int64_t version; // latest checkpointId version - int64_t currentVer; // current offset in WAL, not serialize it + int64_t checkpointId; + int64_t checkpointVer; // latest checkpointId version + int64_t currentVer; // current offset in WAL, not serialize it } SCheckpointInfo; typedef struct SStreamStatus { diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 39f0a81d499766f01cd8598e4e81407891aba3be..b8b7e8e172cb20cdaef467a6d8a0e96d4592d4bf 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -92,7 +92,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { streamSetupScheduleTrigger(pTask); qDebug("snode:%d expand stream task on snode, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", SNODE_HANDLE, - pTask->id.idStr, pTask->chkInfo.version, pTask->info.selfChildId, pTask->info.taskLevel); + pTask->id.idStr, pTask->chkInfo.checkpointVer, pTask->info.selfChildId, pTask->info.taskLevel); return 0; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 8a905566f3c074b25abb11f4a120a401316bc1d9..1ac096cc74f78aad9f38648be3df5032cd84d795 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -758,12 +758,11 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->pMeta = pTq->pStreamMeta; // checkpoint exists, restore from the last checkpoint - if (pTask->chkInfo.keptCheckpointId != 0) { - ASSERT(pTask->chkInfo.version > 0); - pTask->chkInfo.currentVer = pTask->chkInfo.version; - pTask->dataRange.range.maxVer = pTask->chkInfo.version; - pTask->dataRange.range.minVer = pTask->chkInfo.version; - pTask->chkInfo.currentVer = pTask->chkInfo.version; + if (pTask->chkInfo.checkpointId != 0) { + ASSERT(pTask->chkInfo.checkpointVer > 0); + pTask->chkInfo.currentVer = pTask->chkInfo.checkpointVer; + pTask->dataRange.range.maxVer = pTask->chkInfo.checkpointVer; + pTask->dataRange.range.minVer = pTask->chkInfo.checkpointVer; } else { pTask->chkInfo.currentVer = ver; pTask->dataRange.range.maxVer = ver; @@ -785,7 +784,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { } SReadHandle handle = { - .version = pTask->chkInfo.currentVer, + .checkpointId = pTask->chkInfo.checkpointId, .vnode = pTq->pVnode, .initTqReader = 1, .pStateBackend = pTask->pState, @@ -817,7 +816,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->pUpstreamEpInfoList); SReadHandle handle = { - .version = pTask->chkInfo.currentVer, + .checkpointId = pTask->chkInfo.checkpointId, .vnode = NULL, .numOfVgroups = numOfVgroups, .pStateBackend = pTask->pState, @@ -871,12 +870,12 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " currentVer:%" PRId64 " child id:%d, level:%d, scan-history:%d, trigger:%" PRId64 " ms", - vgId, pTask->id.idStr, pChkInfo->keptCheckpointId, pChkInfo->version, pChkInfo->currentVer, + vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer, pTask->info.selfChildId, pTask->info.taskLevel, pTask->info.fillHistory, pTask->triggerParam); - if (pTask->chkInfo.keptCheckpointId != 0) { + if (pTask->chkInfo.checkpointId != 0) { tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr, - pChkInfo->keptCheckpointId, pChkInfo->version, pChkInfo->currentVer); + pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer); } return 0; @@ -1277,7 +1276,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { int8_t status = pTask->status.taskStatus; if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__HALT || status == TASK_STATUS__CK) { tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr, - pTask->chkInfo.version); + pTask->chkInfo.checkpointVer); streamProcessRunReq(pTask); } else { atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index ef24e854460e7713680c69dba27fa49116cf4849..1bacc257d06b5ffad4d3200a106c15da9a7b9f08 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -295,7 +295,7 @@ static int32_t vnodePreProcessDeleteMsg(SVnode *pVnode, SRpcMsg *pMsg) { SEncoder *pCoder = &(SEncoder){0}; SDeleteRes res = {0}; - SReadHandle handle = {.config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb}; + SReadHandle handle = {.vnode = pVnode, .pMsgCb = &pVnode->msgCb}; initStorageAPI(&handle.api); code = qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res); @@ -580,7 +580,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { return 0; } - SReadHandle handle = {.config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb}; + SReadHandle handle = {.vnode = pVnode, .pMsgCb = &pVnode->msgCb}; initStorageAPI(&handle.api); switch (pMsg->msgType) { diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 455a91b6be7847c793f31a1867ff29f61bd5553b..d770a0939598bd8433579d8459b9b0b99a74c0a9 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -241,8 +241,8 @@ int32_t streamSaveTasks(SStreamMeta* pMeta, int64_t checkpointId) { uint32_t* pTaskId = taosArrayGet(pMeta->pTaskList, i); SStreamTask* p = *(SStreamTask**)taosHashGet(pMeta->pTasks, pTaskId, sizeof(*pTaskId)); - ASSERT(p->chkInfo.keptCheckpointId < p->checkpointingId && p->checkpointingId == checkpointId); - p->chkInfo.keptCheckpointId = p->checkpointingId; + ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId); + p->chkInfo.checkpointId = p->checkpointingId; int8_t prev = p->status.taskStatus; p->status.taskStatus = TASK_STATUS__NORMAL; @@ -250,7 +250,7 @@ int32_t streamSaveTasks(SStreamMeta* pMeta, int64_t checkpointId) { streamMetaSaveTask(pMeta, p); qDebug("vgId:%d s-task:%s commit task status after checkpoint completed, checkpointId:%" PRId64 ", ver:%" PRId64 " currentVer:%" PRId64 ", status to be normal, prev:%s", - pMeta->vgId, p->id.idStr, checkpointId, p->chkInfo.version, p->chkInfo.currentVer, + pMeta->vgId, p->id.idStr, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.currentVer, streamGetTaskStatusStr(prev)); } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 36bc4b1968b1f8ae4ef213957dfca2fb1dcca577..983edfb30d8bc2e17bd3c09db352c6205387479f 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -482,8 +482,8 @@ int32_t streamExecForAll(SStreamTask* pTask) { const SStreamQueueItem* pItem = pInput; qDebug("s-task:%s start to process batch of blocks, num:%d, type:%d", id, batchSize, pItem->type); - int64_t ver = pTask->chkInfo.version; - doSetStreamInputBlock(pTask, pInput, &pTask->chkInfo.version, id); + int64_t ver = pTask->chkInfo.checkpointVer; + doSetStreamInputBlock(pTask, pInput, &pTask->chkInfo.checkpointVer, id); int64_t resSize = 0; int32_t totalBlocks = 0; @@ -494,11 +494,11 @@ int32_t streamExecForAll(SStreamTask* pTask) { resSize / 1048576.0, totalBlocks); // update the currentVer if processing the submit blocks. - ASSERT(pTask->chkInfo.version <= pTask->chkInfo.currentVer && ver <= pTask->chkInfo.version); + ASSERT(pTask->chkInfo.checkpointVer <= pTask->chkInfo.currentVer && ver <= pTask->chkInfo.checkpointVer); - if (ver != pTask->chkInfo.version) { + if (ver != pTask->chkInfo.checkpointVer) { qDebug("s-task:%s update checkpoint ver from %" PRId64 " to %" PRId64, pTask->id.idStr, ver, - pTask->chkInfo.version); + pTask->chkInfo.checkpointVer); } streamFreeQitem(pInput); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 6b3f917176d14ab6688b417ebbe019d98dde7ee2..311b8d3d911ddd3e92e29827cbd333a1a1e30071 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -434,7 +434,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { // remove duplicate void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId)); if (p == NULL) { - if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.version) < 0) { + if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer) < 0) { tdbFree(pKey); tdbFree(pVal); tdbTbcClose(pCur); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index aec6e4b44685a7dfd8a83a6f43f225e05873cb77..ba4d1e1cd7dda2848a96d5ee3188610602fefd73 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -85,8 +85,8 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { if (tEncodeI32(pEncoder, pTask->info.nodeId) < 0) return -1; if (tEncodeSEpSet(pEncoder, &pTask->info.epSet) < 0) return -1; - if (tEncodeI64(pEncoder, pTask->chkInfo.keptCheckpointId) < 0) return -1; - if (tEncodeI64(pEncoder, pTask->chkInfo.version) < 0) return -1; + if (tEncodeI64(pEncoder, pTask->chkInfo.checkpointId) < 0) return -1; + if (tEncodeI64(pEncoder, pTask->chkInfo.checkpointVer) < 0) return -1; if (tEncodeI8(pEncoder, pTask->info.fillHistory) < 0) return -1; if (tEncodeI64(pEncoder, pTask->historyTaskId.streamId)) return -1; @@ -148,8 +148,8 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tDecodeI32(pDecoder, &pTask->info.nodeId) < 0) return -1; if (tDecodeSEpSet(pDecoder, &pTask->info.epSet) < 0) return -1; - if (tDecodeI64(pDecoder, &pTask->chkInfo.keptCheckpointId) < 0) return -1; - if (tDecodeI64(pDecoder, &pTask->chkInfo.version) < 0) return -1; + if (tDecodeI64(pDecoder, &pTask->chkInfo.checkpointId) < 0) return -1; + if (tDecodeI64(pDecoder, &pTask->chkInfo.checkpointVer) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->info.fillHistory) < 0) return -1; if (tDecodeI64(pDecoder, &pTask->historyTaskId.streamId)) return -1;