From 76b5c08f1bdbbdd722990b04f1fee277598f3272 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 17 May 2023 05:53:25 +0000 Subject: [PATCH] add checkpoint --- include/common/tmsgdef.h | 4 +- source/dnode/mnode/impl/src/mndStream.c | 16 ++--- source/dnode/vnode/src/inc/vnodeInt.h | 4 +- source/dnode/vnode/src/tq/tq.c | 80 +++++++++++++------------ 4 files changed, 53 insertions(+), 51 deletions(-) diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 1f2d597496..8761f089a0 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -174,13 +174,13 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_SERVER_VERSION, "server-version", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_UPTIME_TIMER, "uptime-timer", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, "lost-consumer-clear", NULL, NULL) - // TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHECKPOINT_TIMER, "stream-checkpoint-tmr", NULL, NULL) - // TD_DEF_MSG_TYPE(TDMT_MND_STREAM_BEGIN_CHECKPOINT, "stream-begin-checkpoint", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_BALANCE_VGROUP_LEADER, "balance-vgroup-leader", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_RESTORE_DNODE, "restore-dnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_PAUSE_STREAM, "pause-stream", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_RESUME_STREAM, "resume-stream", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHECKPOINT_TIMER, "stream-checkpoint-tmr", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_STREAM_BEGIN_CHECKPOINT, "stream-begin-checkpoint", NULL, NULL) TD_NEW_MSG_SEG(TDMT_VND_MSG) TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 68b8dd7201..8216eecf1e 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -70,8 +70,8 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_PAUSE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_RESUME_RSP, mndTransProcessRsp); - // mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_TIMER, mndProcessStreamCheckpointTmr); - // mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamDoCheckpoint); + mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_TIMER, mndProcessStreamCheckpointTmr); + mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamDoCheckpoint); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_PAUSE_STREAM, mndProcessPauseStreamReq); @@ -792,8 +792,6 @@ _OVER: return code; } -#if 0 - static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; @@ -834,8 +832,8 @@ static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, con req.checkpointId = pMsg->checkpointId; req.nodeId = pTask->nodeId; req.expireTime = -1; - req.streamId = pTask->streamId; - req.taskId = pTask->taskId; + req.streamId = pTask->id.streamId; + req.taskId = pTask->id.taskId; int32_t code; int32_t blen; @@ -880,7 +878,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { SStreamObj *pStream = mndAcquireStream(pMnode, pMsg->streamName); if (pStream == NULL || pStream->uid != pMsg->streamId) { - mError("start checkpointing failed since stream %s not found", pMsg->streamName); + mError("failed to checkpoint since stream %s not found", pMsg->streamName); return -1; } @@ -889,6 +887,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { if (pTrans == NULL) return -1; mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); if (mndTrancCheckConflict(pMnode, pTrans) != 0) { + mError("failed to checkpoin since stream %s not found", tstrerror(TSDB_CODE_MND_TRANS_CONFLICT)); mndReleaseStream(pMnode, pStream); mndTransDrop(pTrans); return -1; @@ -958,8 +957,6 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { return 0; } -#endif - static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SStreamObj *pStream = NULL; @@ -1401,7 +1398,6 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { return TSDB_CODE_ACTION_IN_PROGRESS; } - static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask, int8_t igUntreated) { SVResumeStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResumeStreamTaskReq)); if (pReq == NULL) { diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index d7f0ef041a..1553c84e2e 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -197,7 +197,7 @@ void tqClose(STQ*); int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg); int tqUnregisterPushHandle(STQ* pTq, void* pHandle); -int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed. +int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed. int tqCommit(STQ*); int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); @@ -220,6 +220,7 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t version, char* msg, int32_t msg int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessSubmitReqForSubscribe(STQ* pTq); +int32_t tqProcessStreamCheckPointReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen); int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver); int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec); @@ -228,6 +229,7 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t version, char* msg, int32_t msgLen); + int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqCheckLogInWal(STQ* pTq, int64_t version); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index f6f2b3ec53..91a6a1b25b 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -71,7 +71,7 @@ static void destroyTqHandle(void* data) { walCloseReader(pData->pWalReader); tqCloseReader(pData->execHandle.pTqReader); } - if(pData->msg != NULL) { + if (pData->msg != NULL) { rpcFreeCont(pData->msg->pCont); taosMemoryFree(pData->msg); pData->msg = NULL; @@ -233,14 +233,15 @@ int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId) { int64_t sver = 0, ever = 0; walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever); - tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP, sver, ever); + tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP, sver, + ever); char buf1[80] = {0}; char buf2[80] = {0}; tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset); tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset); - tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s", - vgId, dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2); + tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s", vgId, + dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2); return 0; } @@ -256,8 +257,8 @@ int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* tFormatOffset(buf1, 80, &pRsp->reqOffset); tFormatOffset(buf2, 80, &pRsp->rspOffset); - tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64, - vgId, pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId); + tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64, vgId, + pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId); return 0; } @@ -332,8 +333,7 @@ int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) STqHandle* pHandle = taosHashGet(pTq->pHandle, pOffset->subKey, strlen(pOffset->subKey)); if (pHandle == NULL) { - tqError("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", vgOffset.consumerId, vgId, - pOffset->subKey); + tqError("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", vgOffset.consumerId, vgId, pOffset->subKey); terrno = TSDB_CODE_INVALID_MSG; return -1; } @@ -349,7 +349,7 @@ int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) } taosRUnLockLatch(&pTq->lock); - //3. check the offset info + // 3. check the offset info STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey); if (pSavedOffset != NULL) { if (pSavedOffset->val.type != TMQ_OFFSET__LOG) { @@ -377,7 +377,7 @@ int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) tqDebug("vgId:%d sub:%s seek to:%" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, pOffset->val.version, pSavedOffset->val.version); } else { - tqDebug("vgId:%d sub:%s seek to:%"PRId64" not saved yet", vgId, pOffset->subKey, pOffset->val.version); + tqDebug("vgId:%d sub:%s seek to:%" PRId64 " not saved yet", vgId, pOffset->subKey, pOffset->val.version); } if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) { @@ -525,7 +525,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { dataRsp.rspOffset.type = TMQ_OFFSET__LOG; if (reqOffset.type == TMQ_OFFSET__LOG) { - dataRsp.rspOffset.version = currentVer; // return current consume offset value + dataRsp.rspOffset.version = currentVer; // return current consume offset value } else if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) { dataRsp.rspOffset.version = sver; // not consume yet, set the earliest position } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { @@ -545,16 +545,16 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg; - int32_t vgId = TD_VID(pTq->pVnode); + int32_t vgId = TD_VID(pTq->pVnode); tqDebug("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey); int32_t code = 0; -// taosWLockLatch(&pTq->lock); -// int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey)); -// if (code != 0) { -// tqDebug("vgId:%d, tq remove push handle %s", pTq->pVnode->config.vgId, pReq->subKey); -// } -// taosWUnLockLatch(&pTq->lock); + // taosWLockLatch(&pTq->lock); + // int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey)); + // if (code != 0) { + // tqDebug("vgId:%d, tq remove push handle %s", pTq->pVnode->config.vgId, pReq->subKey); + // } + // taosWUnLockLatch(&pTq->lock); STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); if (pHandle) { @@ -618,7 +618,7 @@ int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t } int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { - int ret = 0; + int ret = 0; SMqRebVgReq req = {0}; tDecodeSMqRebVgReq(msg, &req); @@ -730,7 +730,6 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg // remove if it has been register in the push manager, and return one empty block to consumer tqUnregisterPushHandle(pTq, pHandle); - if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { qStreamCloseTsdbReader(pTaskInfo); } @@ -745,9 +744,7 @@ end: return ret; } -void freePtr(void *ptr) { - taosMemoryFree(*(void**)ptr); -} +void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); } int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { int32_t vgId = TD_VID(pTq->pVnode); @@ -769,7 +766,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->chkInfo.currentVer = ver; // expand executor - pTask->status.taskStatus = (pTask->fillHistory)? TASK_STATUS__WAIT_DOWNSTREAM:TASK_STATUS__NORMAL; + pTask->status.taskStatus = (pTask->fillHistory) ? TASK_STATUS__WAIT_DOWNSTREAM : TASK_STATUS__NORMAL; if (pTask->taskLevel == TASK_LEVEL__SOURCE) { pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1); @@ -833,8 +830,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { streamSetupTrigger(pTask); - tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", vgId, pTask->id.idStr, - pTask->chkInfo.version, pTask->selfChildId, pTask->taskLevel); + tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", vgId, + pTask->id.idStr, pTask->chkInfo.version, pTask->selfChildId, pTask->taskLevel); // next valid version will add one pTask->chkInfo.version += 1; @@ -1052,7 +1049,7 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t // do recovery step 2 int64_t st = taosGetTimestampMs(); - tqDebug("s-task:%s start step2 recover, ts:%"PRId64, pTask->id.idStr, st); + tqDebug("s-task:%s start step2 recover, ts:%" PRId64, pTask->id.idStr, st); code = streamSourceRecoverScanStep2(pTask, sversion); if (code < 0) { @@ -1080,7 +1077,7 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t return -1; } - double el = (taosGetTimestampMs() - st)/ 1000.0; + double el = (taosGetTimestampMs() - st) / 1000.0; tqDebug("s-task:%s step2 recover finished, el:%.2fs", pTask->id.idStr, el); // dispatch recover finish req to all related downstream task @@ -1265,8 +1262,11 @@ int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) { if (ASSERT(pHandle->msg != NULL)) { tqError("pHandle->msg should not be null"); break; - }else{ - SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME, .pCont = pHandle->msg->pCont, .contLen = pHandle->msg->contLen, .info = pHandle->msg->info}; + } else { + SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME, + .pCont = pHandle->msg->pCont, + .contLen = pHandle->msg->contLen, + .info = pHandle->msg->info}; tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg); taosMemoryFree(pHandle->msg); pHandle->msg = NULL; @@ -1297,8 +1297,8 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); if (pTask != NULL) { if (pTask->status.taskStatus == TASK_STATUS__NORMAL) { - tqDebug("vgId:%d s-task:%s start to process block from wal, last chk point:%" PRId64, vgId, - pTask->id.idStr, pTask->chkInfo.version); + tqDebug("vgId:%d s-task:%s start to process block from wal, last chk point:%" PRId64, vgId, pTask->id.idStr, + pTask->chkInfo.version); streamProcessRunReq(pTask); } else { tqDebug("vgId:%d s-task:%s ignore run req since not in ready state", vgId, pTask->id.idStr); @@ -1356,7 +1356,7 @@ int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgL int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg; - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); if (pTask) { tqDebug("vgId:%d s-task:%s set pause flag", pTq->pStreamMeta->vgId, pTask->id.idStr); atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus); @@ -1368,18 +1368,18 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg; - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); if (pTask) { atomic_store_8(&pTask->status.taskStatus, pTask->status.keepTaskStatus); // no lock needs to secure the access of the version if (pReq->igUntreated) { // discard all the data when the stream task is suspended. pTask->chkInfo.currentVer = sversion; - tqDebug("vgId:%d s-task:%s resume to normal from the latest version:%" PRId64 ", vnode ver:%" PRId64, pTq->pStreamMeta->vgId, - pTask->id.idStr, pTask->chkInfo.currentVer, sversion); + tqDebug("vgId:%d s-task:%s resume to normal from the latest version:%" PRId64 ", vnode ver:%" PRId64, + pTq->pStreamMeta->vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion); } else { // from the previous paused version and go on - tqDebug("vgId:%d s-task:%s resume to normal from paused ver:%" PRId64 ", vnode ver:%" PRId64, pTq->pStreamMeta->vgId, - pTask->id.idStr, pTask->chkInfo.currentVer, sversion); + tqDebug("vgId:%d s-task:%s resume to normal from paused ver:%" PRId64 ", vnode ver:%" PRId64, + pTq->pStreamMeta->vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion); } streamMetaReleaseTask(pTq->pStreamMeta, pTask); @@ -1523,3 +1523,7 @@ int32_t tqStartStreamTasks(STQ* pTq) { return 0; } +int32_t tqProcessStreamCheckPointReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { + return 0; + return 0; +} \ No newline at end of file -- GitLab