diff --git a/include/dnode/mnode/sdb/sdb.h b/include/dnode/mnode/sdb/sdb.h index 2abe0e5c737c8dd52c92cc0e34a052f44155e298..3a885dd3da74f12063b421e3073005f0ed213cd8 100644 --- a/include/dnode/mnode/sdb/sdb.h +++ b/include/dnode/mnode/sdb/sdb.h @@ -304,13 +304,14 @@ int32_t sdbGetMaxId(SSdb *pSdb, ESdbType type); int64_t sdbGetTableVer(SSdb *pSdb, ESdbType type); /** - * @brief Update the version of sdb + * @brief Update the index of sdb * * @param pSdb The sdb object. - * @param val The update value of the version. - * @return int32_t The current version of sdb + * @param index The update value of the apply index. + * @return int32_t The current index of sdb */ -int64_t sdbUpdateVer(SSdb *pSdb, int32_t val); +void sdbSetApplyIndex(SSdb *pSdb, int64_t index); +int64_t sdbGetApplyIndex(SSdb *pSdb); SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen); void sdbFreeRaw(SSdbRaw *pRaw); diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 0b7ed04819b1b173852ab39c4f4efc202ee8b9cf..df7949f6a5a7ae347175bf9a961b2b06eb66314a 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -128,38 +128,35 @@ int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); } void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { - SyncIndex beginIndex = SYNC_INDEX_INVALID; - if (pFsm->FpGetSnapshot != NULL) { - SSnapshot snapshot = {0}; - pFsm->FpGetSnapshot(pFsm, &snapshot); - beginIndex = snapshot.lastApplyIndex; - } - - if (cbMeta.index > beginIndex) { - SMnode *pMnode = pFsm->data; - SSyncMgmt *pMgmt = &pMnode->syncMgmt; - - SRpcMsg *pApplyMsg = (SRpcMsg *)pMsg; - pApplyMsg->info.node = pFsm->data; - mndProcessApplyMsg(pApplyMsg); - sdbUpdateVer(pMnode->pSdb, 1); + SMnode *pMnode = pFsm->data; + SSdb *pSdb = pMnode->pSdb; + SSyncMgmt *pMgmt = &pMnode->syncMgmt; + SyncIndex lastApply = sdbGetApplyIndex(pSdb); + SSdbRaw *pRaw = pMsg->pCont; + if (cbMeta.index > lastApply) { + mTrace("ver:%" PRId64 ", apply raw:%p to sdb, role:%s", cbMeta.index, pRaw, syncStr(cbMeta.state)); + sdbWriteWithoutFree(pMnode->pSdb, pRaw); + sdbSetApplyIndex(pMnode->pSdb, cbMeta.index); if (cbMeta.state == TAOS_SYNC_STATE_LEADER) { tsem_post(&pMgmt->syncSem); } + } else { + mTrace("ver:%" PRId64 ", already apply raw:%p to sdb, last:%" PRId64, cbMeta.index, pRaw, lastApply); } } -void mndSyncPreCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { +static void mndSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { // strict consistent, do nothing } -void mndSyncRollBackMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { +static void mndSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { // strict consistent, do nothing } -int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) { - // snapshot +static int32_t mndSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) { + SMnode *pMnode = pFsm->data; + pSnapshot->lastApplyIndex = sdbGetApplyIndex(pMnode->pSdb); return 0; } @@ -182,23 +179,25 @@ int32_t mndInitSync(SMnode *pMnode) { return -1; } - SSyncInfo syncInfo = {.vgId = 1}; - SSyncCfg *pCfg = &(syncInfo.syncCfg); + SSyncInfo syncInfo = {.vgId = 1, .FpSendMsg = mndSyncSendMsg, .FpEqMsg = mndSyncEqMsg}; + snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP); + syncInfo.pWal = pMgmt->pWal; + syncInfo.pFsm = mndSyncMakeFsm(pMnode); + + SSyncCfg *pCfg = &syncInfo.syncCfg; pCfg->replicaNum = pMnode->replica; pCfg->myIndex = pMnode->selfIndex; for (int32_t i = 0; i < pMnode->replica; ++i) { - SNodeInfo *pNodeInfo = &pCfg->nodeInfo[i]; - tstrncpy(pNodeInfo->nodeFqdn, pMnode->replicas[i].fqdn, sizeof(pNodeInfo->nodeFqdn)); - pNodeInfo->nodePort = pMnode->replicas[i].port; + SNodeInfo *pNode = &pCfg->nodeInfo[i]; + tstrncpy(pNode->nodeFqdn, pMnode->replicas[i].fqdn, sizeof(pNode->nodeFqdn)); + pNode->nodePort = pMnode->replicas[i].port; } - snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP); - syncInfo.pWal = pMnode->syncMgmt.pWal; - syncInfo.pFsm = mndSyncMakeFsm(pMnode); - syncInfo.FpSendMsg = mndSyncSendMsg; - syncInfo.FpEqMsg = mndSyncEqMsg; - pMnode->syncMgmt.sync = syncOpen(&syncInfo); - ASSERT(pMnode->syncMgmt.sync > 0); + pMgmt->sync = syncOpen(&syncInfo); + if (pMgmt->sync <= 0) { + mError("failed to open sync since %s", terrstr()); + return -1; + } return 0; } @@ -209,16 +208,6 @@ void mndCleanupSync(SMnode *pMnode) { mndCloseWal(pMnode); } -static int32_t mndSyncApplyCb(struct SSyncFSM *fsm, SyncIndex index, const SSyncBuffer *buf, void *pData) { - SMnode *pMnode = pData; - SSyncMgmt *pMgmt = &pMnode->syncMgmt; - - pMgmt->errCode = 0; - tsem_post(&pMgmt->syncSem); - - return 0; -} - int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) { SWal *pWal = pMnode->syncMgmt.pWal; SSdb *pSdb = pMnode->pSdb; diff --git a/source/dnode/mnode/impl/test/sdb/sdbTest.cpp b/source/dnode/mnode/impl/test/sdb/sdbTest.cpp index b93adf99305492fec346a1fb981aef7e4e55979f..6808c412d85ab2261fe52f45b04161ed157b7281 100644 --- a/source/dnode/mnode/impl/test/sdb/sdbTest.cpp +++ b/source/dnode/mnode/impl/test/sdb/sdbTest.cpp @@ -493,9 +493,8 @@ TEST_F(MndTestSdb, 01_Write_Str) { ASSERT_EQ(sdbGetSize(pSdb, SDB_USER), 2); ASSERT_EQ(sdbGetMaxId(pSdb, SDB_USER), -1); ASSERT_EQ(sdbGetTableVer(pSdb, SDB_USER), 2 ); - ASSERT_EQ(sdbUpdateVer(pSdb, 0), -1); - ASSERT_EQ(sdbUpdateVer(pSdb, 1), 0); - ASSERT_EQ(sdbUpdateVer(pSdb, -1), -1); + sdbSetApplyIndex(pSdb, -1); + ASSERT_EQ(sdbGetApplyIndex(pSdb), -1); ASSERT_EQ(mnode.insertTimes, 2); ASSERT_EQ(mnode.deleteTimes, 0); @@ -537,9 +536,6 @@ TEST_F(MndTestSdb, 01_Write_Str) { ASSERT_EQ(sdbGetSize(pSdb, SDB_USER), 3); ASSERT_EQ(sdbGetTableVer(pSdb, SDB_USER), 4); - ASSERT_EQ(sdbUpdateVer(pSdb, 0), -1); - ASSERT_EQ(sdbUpdateVer(pSdb, 1), 0); - ASSERT_EQ(sdbUpdateVer(pSdb, -1), -1); ASSERT_EQ(mnode.insertTimes, 3); ASSERT_EQ(mnode.deleteTimes, 0); @@ -704,8 +700,9 @@ TEST_F(MndTestSdb, 01_Write_Str) { } // write version - ASSERT_EQ(sdbUpdateVer(pSdb, 1), 0); - ASSERT_EQ(sdbUpdateVer(pSdb, 1), 1); + sdbSetApplyIndex(pSdb, 0); + sdbSetApplyIndex(pSdb, 1); + ASSERT_EQ(sdbGetApplyIndex(pSdb), 1); ASSERT_EQ(sdbWriteFile(pSdb), 0); ASSERT_EQ(sdbWriteFile(pSdb), 0); @@ -775,7 +772,7 @@ TEST_F(MndTestSdb, 01_Read_Str) { ASSERT_EQ(sdbGetSize(pSdb, SDB_USER), 2); ASSERT_EQ(sdbGetMaxId(pSdb, SDB_USER), -1); ASSERT_EQ(sdbGetTableVer(pSdb, SDB_USER), 5); - ASSERT_EQ(sdbUpdateVer(pSdb, 0), 1); + ASSERT_EQ(sdbGetApplyIndex(pSdb), 1); ASSERT_EQ(mnode.insertTimes, 4); ASSERT_EQ(mnode.deleteTimes, 0); diff --git a/source/dnode/mnode/sdb/src/sdb.c b/source/dnode/mnode/sdb/src/sdb.c index 6e5dde57a6ddb1f45c60a65542e6ea282836d94c..10bf4126c55918587c393c1e32bb71e5e4e222e1 100644 --- a/source/dnode/mnode/sdb/src/sdb.c +++ b/source/dnode/mnode/sdb/src/sdb.c @@ -156,4 +156,6 @@ static int32_t sdbCreateDir(SSdb *pSdb) { return 0; } -int64_t sdbUpdateVer(SSdb *pSdb, int32_t val) { return atomic_add_fetch_64(&pSdb->curVer, val); } \ No newline at end of file +void sdbSetApplyIndex(SSdb *pSdb, int64_t index) { pSdb->curVer = index; } + +int64_t sdbGetApplyIndex(SSdb *pSdb) { return pSdb->curVer; }