From 6953688761f0e4674cf63f0471482aaf2530b17c Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 20 Apr 2022 11:30:18 +0000 Subject: [PATCH] add commit snapshot --- source/dnode/vnode/inc/vnode.h | 1 + source/dnode/vnode/src/inc/vnodeInt.h | 2 +- source/dnode/vnode/src/vnd/vnodeCommit.c | 15 ++++++++++++++- source/dnode/vnode/src/vnd/vnodeOpen.c | 7 ++++--- source/dnode/vnode/src/vnd/vnodeSvr.c | 2 ++ 5 files changed, 22 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 00bcec7be9..08b97ed6c4 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -62,6 +62,7 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName); int64_t vnodeGetSyncHandle(SVnode *pVnode); +void vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot); // meta typedef struct SMeta SMeta; // todo: remove diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 06ec03741c..0027424829 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -74,7 +74,7 @@ typedef struct { // SVState struct SVState { - int64_t processed; + // int64_t processed; int64_t committed; int64_t applied; }; diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 55e4c5110a..a034b13fbe 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -158,12 +158,23 @@ int vnodeSyncCommit(SVnode *pVnode) { } static int vnodeCommit(void *arg) { - SVnode *pVnode = (SVnode *)arg; + SVnode *pVnode = (SVnode *)arg; + char dir[TSDB_FILENAME_LEN]; + SVnodeInfo info = {0}; + + snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path); + info.config = pVnode->config; + info.state.committed = pVnode->state.applied; + info.state.applied = pVnode->state.applied; + + vnodeSaveInfo(dir, &info); // metaCommit(pVnode->pMeta); tqCommit(pVnode->pTq); tsdbCommit(pVnode->pTsdb); + vnodeCommitInfo(dir, &info); + vnodeBufPoolRecycle(pVnode); tsem_post(&(pVnode->canCommit)); return 0; @@ -185,6 +196,7 @@ static int vnodeEncodeState(const void *pObj, SJson *pJson) { const SVState *pState = (SVState *)pObj; if (tjsonAddIntegerToObject(pJson, "commit version", pState->committed) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "applied version", pState->applied) < 0) return -1; return 0; } @@ -193,6 +205,7 @@ static int vnodeDecodeState(const SJson *pJson, void *pObj) { SVState *pState = (SVState *)pObj; if (tjsonGetNumberValue(pJson, "commit version", pState->committed) < 0) return -1; + if (tjsonGetNumberValue(pJson, "applied version", pState->applied) < 0) return -1; return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index dee54be5ac..91a754815e 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -75,8 +75,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { pVnode->path = (char *)&pVnode[1]; strcpy(pVnode->path, path); pVnode->config = info.config; - pVnode->state.committed = info.state.committed; - pVnode->state.processed = pVnode->state.applied = pVnode->state.committed; + pVnode->state = info.state; pVnode->pTfs = pTfs; pVnode->msgCb = msgCb; @@ -170,4 +169,6 @@ void vnodeClose(SVnode *pVnode) { } } -int64_t vnodeGetSyncHandle(SVnode *pVnode) { return pVnode->sync; } \ No newline at end of file +int64_t vnodeGetSyncHandle(SVnode *pVnode) { return pVnode->sync; } + +void vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot) { pSnapshot->lastApplyIndex = pVnode->state.committed; } \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index fbeb59fe23..56d4ba392c 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -23,6 +23,7 @@ static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq); static int vnodeProcessSubmitReq(SVnode *pVnode, SSubmitReq *pSubmitReq, SRpcMsg *pRsp); int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version) { +#if 0 SNodeMsg *pMsg; SRpcMsg *pRpc; @@ -40,6 +41,7 @@ int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version) { walFsync(pVnode->pWal, false); +#endif return 0; } -- GitLab