diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 16945b1403f94b09e71ec6af421376eab3fd87c7..5a1653b937fee8ed4427aa1e4a40b459b110125c 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -75,9 +75,10 @@ typedef struct { } STelemMgmt; typedef struct { + SWal *pWal; int32_t errCode; + bool restored; sem_t syncSem; - SWal *pWal; int64_t sync; ESyncState state; } SSyncMgmt; diff --git a/source/dnode/mnode/impl/inc/mndSync.h b/source/dnode/mnode/impl/inc/mndSync.h index fe557cdeac0874dc815b5fe83b795a4b01bbfcec..7f35ff59272048a4e8bea0b0df09ad7428c681b1 100644 --- a/source/dnode/mnode/impl/inc/mndSync.h +++ b/source/dnode/mnode/impl/inc/mndSync.h @@ -25,7 +25,10 @@ extern "C" { int32_t mndInitSync(SMnode *pMnode); void mndCleanupSync(SMnode *pMnode); bool mndIsMaster(SMnode *pMnode); +bool mndIsRestored(SMnode *pMnode); int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw); +void mndSyncStart(SMnode *pMnode); +void mndSyncStop(SMnode *pMnode); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 825b52dd9bd78d7fbe6a3952bd3cfc4117b0c54e..b8ee63d05eed013e6a9e2a7418877ffe5d4a44a4 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -68,7 +68,6 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { int32_t mndInitSync(SMnode *pMnode) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; - tsem_init(&pMgmt->syncSem, 0, 0); char path[PATH_MAX + 20] = {0}; snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP); @@ -102,6 +101,7 @@ int32_t mndInitSync(SMnode *pMnode) { pNode->nodePort = pMnode->replicas[i].port; } + tsem_init(&pMgmt->syncSem, 0, 0); pMgmt->sync = syncOpen(&syncInfo); if (pMgmt->sync <= 0) { mError("failed to open sync since %s", terrstr()); @@ -146,8 +146,30 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) { return pMgmt->errCode; } +void mndSyncStart(SMnode *pMnode) { + SSdb *pSdb = pMnode->pSdb; + int64_t lastApplyIndex = sdbGetApplyIndex(pSdb); + + syncSetMsgCb(pMnode->syncMgmt.sync, &pMnode->msgCb); + syncStart(pMnode->syncMgmt.sync); + + int64_t applyIndex = sdbGetApplyIndex(pSdb); + mndTransPullup(pMnode); + mDebug("pullup trans finished, applyIndex:%" PRId64, applyIndex); + if (applyIndex != lastApplyIndex) { + mInfo("sdb restored from %" PRId64 " to %" PRId64 ", write file", lastApplyIndex, applyIndex); + sdbWriteFile(pSdb); + } + + pMnode->syncMgmt.restored = true; +} + +void mndSyncStop(SMnode *pMnode) { syncStop(pMnode->syncMgmt.sync); } + bool mndIsMaster(SMnode *pMnode) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; pMgmt->state = syncGetMyRole(pMgmt->sync); return pMgmt->state == TAOS_SYNC_STATE_LEADER; } + +bool mndIsRestored(SMnode *pMnode) { return pMnode->syncMgmt.restored; } \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 8e19b023cc160b34de0d7110a18c61fbb492fa22..e4a29365e74a9fb047a26e3f58a10729ad412a95 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -682,16 +682,6 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) { } mDebug("trans:%d, sync finished", pTrans->id); - -// do it in state machine commit cb -#if 0 - code = sdbWriteWithout(pMnode->pSdb, pRaw); - if (code != 0) { - mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr()); - return -1; - } -#endif - return 0; } diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 9a68e0e1f09ccfd643d3794b24a04cbc115d61c1..554556f5b0fd3882d9f8031790c1f3c329f8a1e9 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -86,7 +86,7 @@ static void *mndThreadFp(void *param) { lastTime++; taosMsleep(100); if (pMnode->stopped) break; - if (!mndIsMaster(pMnode)) continue; + if (!mndIsMaster(pMnode) || !mndIsRestored(pMnode)) continue; if (lastTime % (tsTransPullupInterval * 10) == 0) { mndPullupTrans(pMnode); @@ -337,13 +337,12 @@ int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) { } int32_t mndStart(SMnode *pMnode) { - syncSetMsgCb(pMnode->syncMgmt.sync, &pMnode->msgCb); - syncStart(pMnode->syncMgmt.sync); + mndSyncStart(pMnode); return mndInitTimer(pMnode); } -void mndStop(SMnode *pMnode) { - syncStop(pMnode->syncMgmt.sync); +void mndStop(SMnode *pMnode) { + mndSyncStop(pMnode); return mndCleanupTimer(pMnode); } @@ -357,7 +356,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; void *ahandle = pMsg->info.ahandle; int32_t ret = TAOS_SYNC_PROPOSE_OTHER_ERROR; - + if (syncEnvIsStart()) { SSyncNode *pSyncNode = syncNodeAcquire(pMnode->syncMgmt.sync); assert(pSyncNode != NULL); @@ -444,7 +443,6 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { return ret; - return 0; } @@ -454,7 +452,7 @@ int32_t mndProcessMsg(SRpcMsg *pMsg) { mTrace("msg:%p, will be processed, type:%s app:%p", pMsg, TMSG_INFO(pMsg->msgType), ahandle); if (IsReq(pMsg)) { - if (!mndIsMaster(pMnode)) { + if (!mndIsMaster(pMnode) || !mndIsRestored(pMnode)) { terrno = TSDB_CODE_APP_NOT_READY; mDebug("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle); return -1; @@ -514,7 +512,7 @@ int64_t mndGenerateUid(char *name, int32_t len) { int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo, SMonGrantInfo *pGrantInfo) { - if (!mndIsMaster(pMnode)) return -1; + if (!mndIsMaster(pMnode) || !mndIsRestored(pMnode)) return -1; SSdb *pSdb = pMnode->pSdb; int64_t ms = taosGetTimestampMs();