diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 189ea82bfc8f53f2ebdf84c04214bf80e6f21882..cec49a4cbeae969774ee80bedf8cbe1900f85e5a 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -81,6 +81,7 @@ typedef struct { bool standby; bool restored; int32_t errCode; + int32_t transId; } SSyncMgmt; typedef struct { diff --git a/source/dnode/mnode/impl/inc/mndSync.h b/source/dnode/mnode/impl/inc/mndSync.h index 356f215267fcfd76f5a851202c6290b9433796ee..cb9d70d5ee48f542dbe58100328b7f2284ea2926 100644 --- a/source/dnode/mnode/impl/inc/mndSync.h +++ b/source/dnode/mnode/impl/inc/mndSync.h @@ -25,7 +25,7 @@ extern "C" { int32_t mndInitSync(SMnode *pMnode); void mndCleanupSync(SMnode *pMnode); bool mndIsMaster(SMnode *pMnode); -int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw); +int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId); void mndSyncStart(SMnode *pMnode); void mndSyncStop(SMnode *pMnode); diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 344b4f6263a3a0e5ea5af5bd0be1e4b1bce94157..03013a96ded9ccb6776bc8cf3ca68455033802ed 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -702,14 +702,17 @@ static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq) { } } + mTrace("trans:-1, sync reconfig will be proposed"); + SSyncMgmt *pMgmt = &pMnode->syncMgmt; pMgmt->standby = 0; int32_t code = syncReconfig(pMgmt->sync, &cfg); if (code != 0) { - mError("failed to alter mnode sync since %s", terrstr()); + mError("trans:-1, failed to propose sync reconfig since %s", terrstr()); return code; } else { pMgmt->errCode = 0; + pMgmt->transId = -1; tsem_wait(&pMgmt->syncSem); mInfo("alter mnode sync result:%s", tstrerror(pMgmt->errCode)); terrno = pMgmt->errCode; diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index f34ab28cce4fa51a7f9596ffada04971d7e3c5d6..16d836c8170886b6e9349a55201d2cc9d1224732 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -28,16 +28,26 @@ int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); } void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { - SMnode *pMnode = pFsm->data; - SSdbRaw *pRaw = pMsg->pCont; - - mTrace("raw:%p, apply to sdb, ver:%" PRId64 " term:%" PRId64 " role:%s", pRaw, cbMeta.index, cbMeta.term, - syncStr(cbMeta.state)); - sdbWriteWithoutFree(pMnode->pSdb, pRaw); - sdbSetApplyIndex(pMnode->pSdb, cbMeta.index); - sdbSetApplyTerm(pMnode->pSdb, cbMeta.term); - if (cbMeta.state == TAOS_SYNC_STATE_LEADER) { - tsem_post(&pMnode->syncMgmt.syncSem); + SMnode *pMnode = pFsm->data; + SSyncMgmt *pMgmt = &pMnode->syncMgmt; + SSdbRaw *pRaw = pMsg->pCont; + + int32_t transId = sdbGetIdFromRaw(pRaw); + pMgmt->errCode = cbMeta.code; + mTrace("trans:%d, is proposed, savedTransId:%d code:0x%x, ver:%" PRId64 " term:%" PRId64 " role:%s raw:%p", transId, + pMgmt->transId, cbMeta.code, cbMeta.index, cbMeta.term, syncStr(cbMeta.state), pRaw); + + if (pMgmt->errCode == 0) { + sdbWriteWithoutFree(pMnode->pSdb, pRaw); + sdbSetApplyIndex(pMnode->pSdb, cbMeta.index); + sdbSetApplyTerm(pMnode->pSdb, cbMeta.term); + } + + if (pMgmt->transId == transId) { + if (pMgmt->errCode != 0) { + mError("trans:%d, failed to propose since %s", transId, tstrerror(pMgmt->errCode)); + } + tsem_post(&pMgmt->syncSem); } } @@ -78,11 +88,19 @@ int32_t mndSnapshotApply(struct SSyncFSM* pFsm, const SSnapshot* pSnapshot, char } void mndReConfig(struct SSyncFSM *pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) { - mInfo("mndReConfig cbMeta.code:%d, cbMeta.currentTerm:%" PRId64 ", cbMeta.term:%" PRId64 ", cbMeta.index:%" PRId64, - cbMeta.code, cbMeta.currentTerm, cbMeta.term, cbMeta.index); - SMnode *pMnode = pFsm->data; - pMnode->syncMgmt.errCode = cbMeta.code; - tsem_post(&pMnode->syncMgmt.syncSem); + SMnode *pMnode = pFsm->data; + SSyncMgmt *pMgmt = &pMnode->syncMgmt; + + pMgmt->errCode = cbMeta.code; + mInfo("trans:-1, sync reconfig is proposed, savedTransId:%d code:0x%x, curTerm:%" PRId64 " term:%" PRId64, + pMgmt->transId, cbMeta.code, cbMeta.index, cbMeta.term); + + if (pMgmt->transId == -1) { + if (pMgmt->errCode != 0) { + mError("trans:-1, failed to propose sync reconfig since %s", tstrerror(pMgmt->errCode)); + } + tsem_post(&pMgmt->syncSem); + } } SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { @@ -165,15 +183,17 @@ void mndCleanupSync(SMnode *pMnode) { memset(pMgmt, 0, sizeof(SSyncMgmt)); } -int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) { +int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; - pMgmt->errCode = 0; - - SRpcMsg rsp = {.code = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)}; + SRpcMsg rsp = {.code = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)}; rsp.pCont = rpcMallocCont(rsp.contLen); if (rsp.pCont == NULL) return -1; memcpy(rsp.pCont, pRaw, rsp.contLen); + pMgmt->errCode = 0; + pMgmt->transId = transId; + mTrace("trans:%d, will be proposed", pMgmt->transId); + const bool isWeak = false; int32_t code = syncPropose(pMgmt->sync, &rsp, isWeak); if (code == 0) { @@ -187,7 +207,11 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) { } rpcFreeCont(rsp.pCont); - if (code != 0) return code; + if (code != 0) { + mError("trans:%d, failed to propose, code:0x%x", pMgmt->transId, code); + return code; + } + return pMgmt->errCode; } diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 16c56a63df9e11c9ed8745398e62156e9d8fff14..a8e78ddafeae7861456c7271ec978097c56ef9f5 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -680,7 +680,7 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) { sdbSetRawStatus(pRaw, SDB_STATUS_READY); mDebug("trans:%d, sync to other nodes", pTrans->id); - int32_t code = mndSyncPropose(pMnode, pRaw); + int32_t code = mndSyncPropose(pMnode, pRaw, pTrans->id); if (code != 0) { mError("trans:%d, failed to sync since %s", pTrans->id, terrstr()); sdbFreeRaw(pRaw); diff --git a/source/dnode/mnode/sdb/inc/sdb.h b/source/dnode/mnode/sdb/inc/sdb.h index 3d9148360a08ede04e527cd4318fa233689ddf98..3932defd8db79e0a83cfec078c01e8a97e0ec3d5 100644 --- a/source/dnode/mnode/sdb/inc/sdb.h +++ b/source/dnode/mnode/sdb/inc/sdb.h @@ -386,6 +386,8 @@ SSdbIter *sdbIterRead(SSdb *pSdb, SSdbIter *iter, char **ppBuf, int32_t *len); const char *sdbTableName(ESdbType type); void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper); +int32_t sdbGetIdFromRaw(SSdbRaw *pRaw); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/sdb/src/sdbRaw.c b/source/dnode/mnode/sdb/src/sdbRaw.c index ba3b00c12dab08825d0060657f503f6daaa17936..4b61ebb627622bfdb1bb32a1df591d564a7c7b01 100644 --- a/source/dnode/mnode/sdb/src/sdbRaw.c +++ b/source/dnode/mnode/sdb/src/sdbRaw.c @@ -16,6 +16,11 @@ #define _DEFAULT_SOURCE #include "sdb.h" +int32_t sdbGetIdFromRaw(SSdbRaw *pRaw) { + int32_t id = *((int32_t *)(pRaw->pData)); + return id; +} + SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen) { SSdbRaw *pRaw = taosMemoryCalloc(1, dataLen + sizeof(SSdbRaw)); if (pRaw == NULL) { diff --git a/tests/script/tsim/mnode/basic2.sim b/tests/script/tsim/mnode/basic2.sim index 6df37044698e9cc6486ee5214181314472436d6f..18aa85cf5bb00a579b8ed7be14b264845a37948a 100644 --- a/tests/script/tsim/mnode/basic2.sim +++ b/tests/script/tsim/mnode/basic2.sim @@ -123,5 +123,12 @@ sql create table db.stb (ts timestamp, c1 int, c2 binary(4)) tags(t1 int, t2 flo sql create table db.ctb using db.stb tags(101, 102, "103") sql insert into db.ctb values(now, 1, "2") +sql select * from db.ctb +print $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] + +if $rows != 1 then + return -1 +endi + system sh/exec.sh -n dnode1 -s stop system sh/exec.sh -n dnode2 -s stop \ No newline at end of file diff --git a/tests/script/tsim/valgrind/checkError.sim b/tests/script/tsim/valgrind/checkError.sim index 97d16dba9663a77fdf96fe1741d045765a306d42..b0bc0ac83236640276307b43a7ebab02bf6361e1 100644 --- a/tests/script/tsim/valgrind/checkError.sim +++ b/tests/script/tsim/valgrind/checkError.sim @@ -71,7 +71,7 @@ print ====> start to check if there are ERRORS in vagrind log file for each dnod # -n : dnode[x] be check system_content sh/checkValgrind.sh -n dnode1 print cmd return result----> [ $system_content ] -if $system_content <= 1 then +if $system_content <= 2 then return 0 endi