diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index e8a8dee866507b550a9e010c516dce52709e3b2d..9ffd74c229d89e3eed2e9075c63a3b1077a5be3e 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -46,14 +46,14 @@ typedef struct { } SNodeInfo; typedef struct { - int selfIndex; - int replica; + int32_t selfIndex; + int32_t replica; SNodeInfo nodeInfo[TSDB_MAX_REPLICA]; } SSyncCluster; typedef struct { int32_t selfIndex; - int replica; + int32_t replica; SNodeInfo node[TSDB_MAX_REPLICA]; ESyncRole role[TSDB_MAX_REPLICA]; } SNodesRole; @@ -62,20 +62,20 @@ typedef struct SSyncFSM { void* pData; // apply committed log, bufs will be free by raft module - int (*applyLog)(struct SSyncFSM* fsm, SyncIndex index, const SSyncBuffer* buf, void* pData); + int32_t (*applyLog)(struct SSyncFSM* fsm, SyncIndex index, const SSyncBuffer* buf, void* pData); // cluster commit callback - int (*onClusterChanged)(struct SSyncFSM* fsm, const SSyncCluster* cluster, void* pData); + int32_t (*onClusterChanged)(struct SSyncFSM* fsm, const SSyncCluster* cluster, void* pData); // fsm return snapshot in ppBuf, bufs will be free by raft module // TODO: getSnapshot SHOULD be async? - int (*getSnapshot)(struct SSyncFSM* fsm, SSyncBuffer** ppBuf, int* objId, bool* isLast); + int32_t (*getSnapshot)(struct SSyncFSM* fsm, SSyncBuffer** ppBuf, int32_t* objId, bool* isLast); // fsm apply snapshot with pBuf data - int (*applySnapshot)(struct SSyncFSM* fsm, SSyncBuffer* pBuf, int objId, bool isLast); + int32_t (*applySnapshot)(struct SSyncFSM* fsm, SSyncBuffer* pBuf, int32_t objId, bool isLast); // call when restore snapshot and log done - int (*onRestoreDone)(struct SSyncFSM* fsm); + int32_t (*onRestoreDone)(struct SSyncFSM* fsm); void (*onRollback)(struct SSyncFSM* fsm, SyncIndex index, const SSyncBuffer* buf); @@ -118,9 +118,9 @@ typedef struct SSyncClusterConfig { typedef struct SStateManager { void* pData; - void (*saveServerState)(struct SStateManager* stateMng, const SSyncServerState* state); + int32_t (*saveServerState)(struct SStateManager* stateMng, SSyncServerState* state); - const SSyncServerState* (*readServerState)(struct SStateManager* stateMng); + int32_t (*readServerState)(struct SStateManager* stateMng, SSyncServerState* state); // void (*saveCluster)(struct SStateManager* stateMng, const SSyncClusterConfig* cluster); @@ -148,9 +148,9 @@ void syncStop(const SSyncNode*); int32_t syncPropose(SSyncNode* syncNode, SSyncBuffer buffer, void* pData, bool isWeak); -//int32_t syncAddNode(SSyncNode syncNode, const SNodeInfo *pNode); +// int32_t syncAddNode(SSyncNode syncNode, const SNodeInfo *pNode); -//int32_t syncRemoveNode(SSyncNode syncNode, const SNodeInfo *pNode); +// int32_t syncRemoveNode(SSyncNode syncNode, const SNodeInfo *pNode); extern int32_t syncDebugFlag; diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 9a3310922d210c36751510476c529c3a0c2691aa..143bdf07106aaa4c51f57278440d902f36f87676 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -44,41 +44,41 @@ typedef struct { EWalType walLevel; // wal level } SWalCfg; -typedef void * twalh; // WAL HANDLE -typedef int32_t FWalWrite(void *ahandle, void *pHead, int32_t qtype, void *pMsg); +struct SWal; +typedef struct SWal SWal; // WAL HANDLE +typedef int32_t (*FWalWrite)(void *ahandle, void *pHead, int32_t qtype, void *pMsg); -//module initialization -int32_t walInit(); -void walCleanUp(); +// module initialization +int32_t walInit(); +void walCleanUp(); -//handle open and ctl -twalh walOpen(char *path, SWalCfg *pCfg); -int32_t walAlter(twalh, SWalCfg *pCfg); -void walStop(twalh); -void walClose(twalh); +// handle open and ctl +SWal *walOpen(char *path, SWalCfg *pCfg); +int32_t walAlter(SWal *, SWalCfg *pCfg); +void walClose(SWal *); -//write -//int64_t walWriteWithMsgType(twalh, int8_t msgType, void* body, int32_t bodyLen); -int64_t walWrite(twalh, void* body, int32_t bodyLen); -int64_t walWriteBatch(twalh, void** bodies, int32_t* bodyLen, int32_t batchSize); +// write +// int64_t walWriteWithMsgType(SWal*, int8_t msgType, void* body, int32_t bodyLen); +int64_t walWrite(SWal *, int64_t index, void *body, int32_t bodyLen); +int64_t walWriteBatch(SWal *, void **bodies, int32_t *bodyLen, int32_t batchSize); -//apis for lifecycle management -void walFsync(twalh, bool force); -int32_t walCommit(twalh, int64_t ver); -//truncate after -int32_t walRollback(twalh, int64_t ver); -//notify that previous log can be pruned safely -int32_t walPrune(twalh, int64_t ver); +// apis for lifecycle management +void walFsync(SWal *, bool force); +int32_t walCommit(SWal *, int64_t ver); +// truncate after +int32_t walRollback(SWal *, int64_t ver); +// notify that previous log can be pruned safely +int32_t walPrune(SWal *, int64_t ver); -//read -int32_t walRead(twalh, SWalHead **, int64_t ver); -int32_t walReadWithFp(twalh, FWalWrite writeFp, int64_t verStart, int readNum); +// read +int32_t walRead(SWal *, SWalHead **, int64_t ver); +int32_t walReadWithFp(SWal *, FWalWrite writeFp, int64_t verStart, int32_t readNum); -//lifecycle check -int32_t walFirstVer(twalh); -int32_t walPersistedVer(twalh); -int32_t walLastVer(twalh); -//int32_t walDataCorrupted(twalh); +// lifecycle check +int32_t walFirstVer(SWal *); +int32_t walPersistedVer(SWal *); +int32_t walLastVer(SWal *); +// int32_t walDataCorrupted(SWal*); #ifdef __cplusplus } diff --git a/source/libs/wal/src/wal.c b/source/libs/wal/src/wal.c index 8c0fc2b775497c5f1588eb2fa34db7f4eedd00ff..9331cce20ba5391198677b7e76358309677d6c6b 100644 --- a/source/libs/wal/src/wal.c +++ b/source/libs/wal/src/wal.c @@ -19,6 +19,18 @@ int32_t walInit() { return 0; } void walCleanUp() {} -twalh walOpen(char *path, SWalCfg *pCfg) { return NULL; } +SWal *walOpen(char *path, SWalCfg *pCfg) { return NULL; } -int32_t walAlter(twalh pWal, SWalCfg *pCfg) { return 0; } \ No newline at end of file +int32_t walAlter(SWal *pWal, SWalCfg *pCfg) { return 0; } + +void walClose(SWal *pWal) {} + +void walFsync(SWal *pWal, bool force) {} + +int64_t walWrite(SWal *pWal, int64_t index, void *body, int32_t bodyLen) {} + +int32_t walCommit(SWal *pWal, int64_t ver) { return 0; } + +int32_t walRollback(SWal *pWal, int64_t ver) { return 0; } + +int32_t walPrune(SWal *pWal, int64_t ver) { return 0; } \ No newline at end of file diff --git a/source/server/vnode/inc/vnodeFile.h b/source/server/vnode/inc/vnodeFile.h index 31364d8c03471c703891432c92318b7ea0de61c0..bea28324eed9264cfc20985743ea76a9cbb00099 100644 --- a/source/server/vnode/inc/vnodeFile.h +++ b/source/server/vnode/inc/vnodeFile.h @@ -23,8 +23,8 @@ extern "C" { int32_t vnodeReadCfg(int32_t vgId, SVnodeCfg *pCfg); int32_t vnodeWriteCfg(int32_t vgId, SVnodeCfg *pCfg); -int32_t vnodeReadTerm(int32_t vgId, SSyncServerState *pState); -int32_t vnodeWriteTerm(int32_t vgid, SSyncServerState *pState); +int32_t vnodeReadState(int32_t vgId, SSyncServerState *pState); +int32_t vnodeSaveState(int32_t vgid, SSyncServerState *pState); #ifdef __cplusplus } diff --git a/source/server/vnode/inc/vnodeInt.h b/source/server/vnode/inc/vnodeInt.h index ac6c77041f42f2dda3b5f4898a18a67136183aa5..90d9e7105e135e2659abb2e8887656dee83f654c 100644 --- a/source/server/vnode/inc/vnodeInt.h +++ b/source/server/vnode/inc/vnodeInt.h @@ -79,7 +79,7 @@ typedef struct { SMeta *pMeta; STsdb *pTsdb; STQ *pTQ; - twalh pWal; + SWal *pWal; void *pQuery; SSyncNode *pSync; taos_queue pWriteQ; // write queue diff --git a/source/server/vnode/src/vnodeFile.c b/source/server/vnode/src/vnodeFile.c index a77c99ec3442da097c1ec5abe641a6773eeda12e..ddcbd2689df91fbcc224e667e785ae2be643e01f 100644 --- a/source/server/vnode/src/vnodeFile.c +++ b/source/server/vnode/src/vnodeFile.c @@ -296,7 +296,7 @@ int32_t vnodeWriteCfg(int32_t vgId, SVnodeCfg *pCfg) { return TSDB_CODE_SUCCESS; } -int32_t vnodeReadTerm(int32_t vgId, SSyncServerState *pState) { +int32_t vnodeReadState(int32_t vgId, SSyncServerState *pState) { int32_t ret = TSDB_CODE_VND_APP_ERROR; int32_t len = 0; int32_t maxLen = 100; @@ -305,7 +305,7 @@ int32_t vnodeReadTerm(int32_t vgId, SSyncServerState *pState) { FILE *fp = NULL; char file[PATH_MAX + 30] = {0}; - sprintf(file, "%s/vnode%d/term.json", tsVnodeDir, vgId); + sprintf(file, "%s/vnode%d/state.json", tsVnodeDir, vgId); len = (int32_t)fread(content, 1, maxLen, fp); if (len <= 0) { @@ -343,9 +343,9 @@ PARSE_TERM_ERROR: return ret; } -int32_t vnodeWriteTerm(int32_t vgId, SSyncServerState *pState) { +int32_t vnodeSaveState(int32_t vgId, SSyncServerState *pState) { char file[PATH_MAX + 30] = {0}; - sprintf(file, "%s/vnode%d/term.json", tsVnodeDir, vgId); + sprintf(file, "%s/vnode%d/state.json", tsVnodeDir, vgId); FILE *fp = fopen(file, "w"); if (!fp) { diff --git a/source/server/vnode/src/vnodeMain.c b/source/server/vnode/src/vnodeMain.c index c08ae7708a1a5b7179879d6830d7550528aae6d5..ced93ea6a7c0f0a84f9edb8c8450aa30347a1863 100644 --- a/source/server/vnode/src/vnodeMain.c +++ b/source/server/vnode/src/vnodeMain.c @@ -130,7 +130,8 @@ static void vnodeDestroyVnode(SVnode *pVnode) { } if (pVnode->pWal) { - // todo + walClose(pVnode->pWal); + pVnode->pWal = NULL; } if (pVnode->allocator) { @@ -166,6 +167,56 @@ static void vnodeCleanupVnode(SVnode *pVnode) { vnodeRelease(pVnode); } +static inline int32_t vnodeLogWrite(struct SSyncLogStore *logStore, SyncIndex index, SSyncBuffer *pBuf) { + SVnode *pVnode = logStore->pData; // vnode status can be checked here + return walWrite(pVnode->pWal, index, pBuf->data, (int32_t)pBuf->len); +} + +static inline int32_t vnodeLogCommit(struct SSyncLogStore *logStore, SyncIndex index) { + SVnode *pVnode = logStore->pData; // vnode status can be checked here + return walCommit(pVnode->pWal, index); +} + +static inline int32_t vnodeLogPrune(struct SSyncLogStore *logStore, SyncIndex index) { + SVnode *pVnode = logStore->pData; // vnode status can be checked here + return walPrune(pVnode->pWal, index); +} + +static inline int32_t vnodeLogRollback(struct SSyncLogStore *logStore, SyncIndex index) { + SVnode *pVnode = logStore->pData; // vnode status can be checked here + return walRollback(pVnode->pWal, index); +} + +static inline int32_t vnodeSaveServerState(struct SStateManager *stateMng, SSyncServerState *pState) { + SVnode *pVnode = stateMng->pData; + return vnodeSaveState(pVnode->vgId, pState); +} + +static inline int32_t vnodeReadServerState(struct SStateManager *stateMng, SSyncServerState *pState) { + SVnode *pVnode = stateMng->pData; + return vnodeSaveState(pVnode->vgId, pState); +} + +static inline int32_t vnodeApplyLog(struct SSyncFSM *fsm, SyncIndex index, const SSyncBuffer *buf, void *pData) { + return 0; +} + +static inline int32_t vnodeOnClusterChanged(struct SSyncFSM *fsm, const SSyncCluster *cluster, void *pData) { return 0; } + +static inline int32_t vnodeGetSnapshot(struct SSyncFSM *fsm, SSyncBuffer **ppBuf, int32_t *objId, bool *isLast) { + return 0; +} + +static inline int32_t vnodeApplySnapshot(struct SSyncFSM *fsm, SSyncBuffer *pBuf, int32_t objId, bool isLast) { + return 0; +} + +static inline int32_t vnodeOnRestoreDone(struct SSyncFSM *fsm) { return 0; } + +static inline void vnodeOnRollback(struct SSyncFSM *fsm, SyncIndex index, const SSyncBuffer *buf) {} + +static inline void vnodeOnRoleChanged(struct SSyncFSM *fsm, const SNodesRole *pRole) {} + static int32_t vnodeOpenVnode(int32_t vgId) { int32_t code = 0; @@ -193,7 +244,7 @@ static int32_t vnodeOpenVnode(int32_t vgId) { return 0; } - code = vnodeReadTerm(vgId, &pVnode->term); + code = vnodeSaveState(vgId, &pVnode->term); if (code != TSDB_CODE_SUCCESS) { vError("vgId:%d, failed to read term file since %s", pVnode->vgId, tstrerror(code)); pVnode->cfg.dropped = 1; @@ -220,25 +271,24 @@ static int32_t vnodeOpenVnode(int32_t vgId) { // create sync node SSyncInfo syncInfo = {0}; syncInfo.vgId = vgId; - syncInfo.snapshotIndex = 0; // todo, from tsdb + syncInfo.snapshotIndex = 0; // todo, from tsdb memcpy(&syncInfo.syncCfg, &pVnode->cfg.sync, sizeof(SSyncCluster)); syncInfo.fsm.pData = pVnode; - syncInfo.fsm.applyLog = NULL; - syncInfo.fsm.onClusterChanged = NULL; - syncInfo.fsm.getSnapshot = NULL; - syncInfo.fsm.applySnapshot = NULL; - syncInfo.fsm.onRestoreDone = NULL; - syncInfo.fsm.onRollback = NULL; + syncInfo.fsm.applyLog = vnodeApplyLog; + syncInfo.fsm.onClusterChanged = vnodeOnClusterChanged; + syncInfo.fsm.getSnapshot = vnodeGetSnapshot; + syncInfo.fsm.applySnapshot = vnodeApplySnapshot; + syncInfo.fsm.onRestoreDone = vnodeOnRestoreDone; + syncInfo.fsm.onRollback = vnodeOnRollback; + syncInfo.fsm.onRoleChanged = vnodeOnRoleChanged; syncInfo.logStore.pData = pVnode; - syncInfo.logStore.logWrite = NULL; - syncInfo.logStore.logCommit = NULL; - syncInfo.logStore.logPrune = NULL; - syncInfo.logStore.logRollback = NULL; + syncInfo.logStore.logWrite = vnodeLogWrite; + syncInfo.logStore.logCommit = vnodeLogCommit; + syncInfo.logStore.logPrune = vnodeLogPrune; + syncInfo.logStore.logRollback = vnodeLogRollback; syncInfo.stateManager.pData = pVnode; - syncInfo.stateManager.saveServerState = NULL; - syncInfo.stateManager.readServerState = NULL; - // syncInfo.stateManager.saveCluster = NULL; - // syncInfo.stateManager.readCluster = NULL; + syncInfo.stateManager.saveServerState = vnodeSaveServerState; + syncInfo.stateManager.readServerState = vnodeReadServerState; pVnode->pSync = syncStart(&syncInfo); if (pVnode->pSync == NULL) {