diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 07f65b2a90a3f70498568091941fd76be6e644f8..8666739dab0d4e45cdcb0f5eabe2d4f3804dd4f3 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -75,12 +75,10 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM } int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pReaderParam, void **ppReader) { - // TODO: - - // atomic operation - // step1. sdbGetCommitInfo - // step2. create ppReader with pReaderParam - + mInfo("start to read snapshot from sdb in atomic way"); + SMnode *pMnode = pFsm->data; + return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm, + &pSnapshot->lastConfigIndex); return 0; } @@ -106,14 +104,6 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM SMnode *pMnode = pFsm->data; SSyncMgmt *pMgmt = &pMnode->syncMgmt; -#if 0 -// send response - SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen, .conn.applyIndex = cbMeta.index}; - rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); - memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen); - syncGetAndDelRespRpc(pMnode->syncMgmt.sync, cbMeta.seqNum, &rpcMsg.info); -#endif - pMgmt->errCode = cbMeta.code; mInfo("trans:-1, sync reconfig is proposed, saved:%d code:0x%x, index:%" PRId64 " term:%" PRId64, pMgmt->transId, cbMeta.code, cbMeta.index, cbMeta.term); @@ -130,7 +120,7 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM int32_t mndSnapshotStartRead(struct SSyncFSM *pFsm, void **ppReader) { mInfo("start to read snapshot from sdb"); SMnode *pMnode = pFsm->data; - return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader); + return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader, NULL, NULL, NULL); } int32_t mndSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) { diff --git a/source/dnode/mnode/impl/test/sdb/sdbTest.cpp b/source/dnode/mnode/impl/test/sdb/sdbTest.cpp index bc118ee26e5996e28e6ef3c1380a71a7f868dd3f..87895d3b4930ff78ab4b147f688f98ff9cd71229 100644 --- a/source/dnode/mnode/impl/test/sdb/sdbTest.cpp +++ b/source/dnode/mnode/impl/test/sdb/sdbTest.cpp @@ -910,7 +910,7 @@ TEST_F(MndTestSdb, 01_Read_Str) { int32_t len = 0; int32_t code = 0; - code = sdbStartRead(pSdb, &pReader); + code = sdbStartRead(pSdb, &pReader, NULL, NULL, NULL); ASSERT_EQ(code, 0); code = sdbStartWrite(pSdb, &pWritter); ASSERT_EQ(code, 0); diff --git a/source/dnode/mnode/sdb/inc/sdb.h b/source/dnode/mnode/sdb/inc/sdb.h index 3b1c4000a83957ea5c7360e40857e9fd4aee8aa2..1294f0cff3446dca7fb7dcdfc6d293d947d13852 100644 --- a/source/dnode/mnode/sdb/inc/sdb.h +++ b/source/dnode/mnode/sdb/inc/sdb.h @@ -388,7 +388,7 @@ SSdbRow *sdbAllocRow(int32_t objSize); void *sdbGetRowObj(SSdbRow *pRow); void sdbFreeRow(SSdb *pSdb, SSdbRow *pRow, bool callFunc); -int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter); +int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter, int64_t *index, int64_t *term, int64_t *config); int32_t sdbStopRead(SSdb *pSdb, SSdbIter *pIter); int32_t sdbDoRead(SSdb *pSdb, SSdbIter *pIter, void **ppBuf, int32_t *len); diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index 0f4e1276c1612ad0f1707057d9e052eb307fe452..47c4a4ed0fe982c64ed8358f66e6d9d0ee6e4f8b 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -523,7 +523,7 @@ static void sdbCloseIter(SSdbIter *pIter) { taosMemoryFree(pIter); } -int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter) { +int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter, int64_t *index, int64_t *term, int64_t *config) { SSdbIter *pIter = sdbCreateIter(pSdb); if (pIter == NULL) return -1; @@ -552,6 +552,10 @@ int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter) { } *ppIter = pIter; + if (index != NULL) *index = commitIndex; + if (term != NULL) *term = commitTerm; + if (config != NULL) *config = commitConfig; + mInfo("sdbiter:%p, is created to read snapshot, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64 " file:%s", pIter, commitIndex, commitTerm, commitConfig, pIter->name); return 0;