From 77b03ac821c8c03d8f583d2e52203dd7021e605a Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 16 Jun 2022 14:40:42 +0800 Subject: [PATCH] refactor(sync): add restore finish when become leader again --- include/libs/sync/sync.h | 2 +- source/dnode/mnode/impl/src/mndSync.c | 12 +-- source/libs/sync/inc/syncInt.h | 3 + source/libs/sync/src/syncMain.c | 129 +++++++++++++++++++------ source/libs/sync/src/syncRaftLog.c | 7 +- source/libs/sync/src/syncRespMgr.c | 15 +-- source/libs/sync/src/syncSnapshot.c | 7 ++ source/libs/sync/test/syncTestTool.cpp | 8 +- 8 files changed, 134 insertions(+), 49 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 2d21b3531a..e694203563 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -24,7 +24,7 @@ extern "C" { #include "tdef.h" #include "tmsgcb.h" -#define SYNC_INDEX_BEGIN 0 +#define SYNC_INDEX_BEGIN 0 #define SYNC_INDEX_INVALID -1 typedef uint64_t SyncNodeId; diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 63708aef8a..3a243e36d3 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -221,17 +221,17 @@ void mndCleanupSync(SMnode *pMnode) { int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; - SRpcMsg rsp = {.code = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)}; - rsp.pCont = rpcMallocCont(rsp.contLen); - if (rsp.pCont == NULL) return -1; - memcpy(rsp.pCont, pRaw, rsp.contLen); + SRpcMsg req = {.msgType = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)}; + req.pCont = rpcMallocCont(req.contLen); + if (req.pCont == NULL) return -1; + memcpy(req.pCont, pRaw, req.contLen); pMgmt->errCode = 0; pMgmt->transId = transId; mTrace("trans:%d, will be proposed", pMgmt->transId); const bool isWeak = false; - int32_t code = syncPropose(pMgmt->sync, &rsp, isWeak); + int32_t code = syncPropose(pMgmt->sync, &req, isWeak); if (code == 0) { tsem_wait(&pMgmt->syncSem); } else if (code == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) { @@ -242,7 +242,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { terrno = TSDB_CODE_APP_ERROR; } - rpcFreeCont(rsp.pCont); + rpcFreeCont(req.pCont); if (code != 0) { mError("trans:%d, failed to propose, code:0x%x", pMgmt->transId, code); return code; diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 8a44dcd9bc..b00c7cbda1 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -195,6 +195,7 @@ int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, S cJSON* syncNode2Json(const SSyncNode* pSyncNode); char* syncNode2Str(const SSyncNode* pSyncNode); char* syncNode2SimpleStr(const SSyncNode* pSyncNode); +bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config); void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex, bool* isDrop); SSyncNode* syncNodeAcquire(int64_t rid); @@ -230,6 +231,8 @@ int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncInd int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag); +int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg); + bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId); SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index a0d8487b63..5e3c1adf32 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -185,7 +185,9 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg ASSERT(rid == pSyncNode->rid); int32_t ret = 0; - bool IamInNew = false; + bool IamInNew = syncNodeInConfig(pSyncNode, pNewCfg); + +#if 0 for (int i = 0; i < pNewCfg->replicaNum; ++i) { if (strcmp((pNewCfg->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 && (pNewCfg->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) { @@ -201,6 +203,7 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg } */ } +#endif if (!IamInNew) { taosReleaseRef(tsNodeRefId, pSyncNode->rid); @@ -228,7 +231,9 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) { } ASSERT(rid == pSyncNode->rid); - bool IamInNew = false; + bool IamInNew = syncNodeInConfig(pSyncNode, pNewCfg); + +#if 0 for (int i = 0; i < pNewCfg->replicaNum; ++i) { if (strcmp((pNewCfg->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 && (pNewCfg->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) { @@ -247,6 +252,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) { } */ } +#endif if (!IamInNew) { sError("sync reconfig error, not in new config"); @@ -556,7 +562,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { return -1; } assert(rid == pSyncNode->rid); - sDebug("vgId:%d sync event propose msgType:%s", pSyncNode->vgId, TMSG_INFO(pMsg->msgType)); + sDebug("vgId:%d sync event propose msgType:%s,%d", pSyncNode->vgId, TMSG_INFO(pMsg->msgType), pMsg->msgType); ret = syncNodePropose(pSyncNode, pMsg, isWeak); taosReleaseRef(tsNodeRefId, pSyncNode->rid); @@ -565,7 +571,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak) { int32_t ret = 0; - sDebug("vgId:%d sync event propose msgType:%s", pSyncNode->vgId, TMSG_INFO(pMsg->msgType)); + sDebug("vgId:%d sync event propose msgType:%s,%d", pSyncNode->vgId, TMSG_INFO(pMsg->msgType), pMsg->msgType); if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { SRespStub stub; @@ -1256,9 +1262,36 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) { return s; } -void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex, bool* isDrop) { +bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config) { + bool b1 = false; + bool b2 = false; + + for (int i = 0; i < config->replicaNum; ++i) { + if (strcmp((config->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 && + (config->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) { + b1 = true; + break; + } + } + + for (int i = 0; i < config->replicaNum; ++i) { + SRaftId raftId; + raftId.addr = syncUtilAddr2U64((config->nodeInfo)[i].nodeFqdn, (config->nodeInfo)[i].nodePort); + raftId.vgId = pSyncNode->vgId; + + if (syncUtilSameId(&raftId, &(pSyncNode->myRaftId))) { + b2 = true; + break; + } + } + + ASSERT(b1 == b2); + return b1; +} + +void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex, bool* isDrop) { SSyncCfg oldConfig = pSyncNode->pRaftCfg->cfg; - pSyncNode->pRaftCfg->cfg = *newConfig; + pSyncNode->pRaftCfg->cfg = *pNewConfig; pSyncNode->pRaftCfg->lastConfigIndex = lastConfigChangeIndex; int32_t ret = 0; @@ -1270,7 +1303,8 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA]; for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { oldSenders[i] = (pSyncNode->senders)[i]; - sDebug("vgId:%d sync event save senders %d, %p", pSyncNode->vgId, i, oldSenders[i]); + sDebug("vgId:%d sync event save senders %d, %p, privateTerm:%lu", pSyncNode->vgId, i, oldSenders[i], + oldSenders[i]->privateTerm); if (gRaftDetailLog) { ; } @@ -1322,8 +1356,8 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l char host[128]; uint16_t port; syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port); - sDebug("vgId:%d sync event reset sender for %lu, newIndex:%d, %s:%d, %p", pSyncNode->vgId, - (pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j]); + sDebug("vgId:%d sync event reset sender for %lu, newIndex:%d, %s:%d, %p, privateTerm:%lu", pSyncNode->vgId, + (pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j], oldSenders[j]->privateTerm); (pSyncNode->senders)[i] = oldSenders[j]; oldSenders[j] = NULL; reset = true; @@ -1341,7 +1375,8 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { if ((pSyncNode->senders)[i] == NULL) { (pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i); - sDebug("vgId:%d sync event create new sender %p replicaIndex:%d", pSyncNode->vgId, (pSyncNode->senders)[i], i); + sDebug("vgId:%d sync event create new sender %p replicaIndex:%d, privateTerm:%lu", pSyncNode->vgId, + (pSyncNode->senders)[i], i, (pSyncNode->senders)[i]->privateTerm); } } @@ -1354,8 +1389,10 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l } } - bool IamInOld = false; - bool IamInNew = false; + bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig); + bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig); + +#if 0 for (int i = 0; i < oldConfig.replicaNum; ++i) { if (strcmp((oldConfig.nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 && (oldConfig.nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) { @@ -1371,6 +1408,7 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l break; } } +#endif *isDrop = true; if (IamInOld && !IamInNew) { @@ -1379,6 +1417,10 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l *isDrop = false; } + // may be add me to a new raft group + if (IamInOld && IamInNew && oldConfig.replicaNum == 1) { + } + if (IamInNew) { pSyncNode->pRaftCfg->isStandBy = 0; // change isStandBy to normal } @@ -1404,14 +1446,17 @@ void syncNodeRelease(SSyncNode* pNode) { taosReleaseRef(tsNodeRefId, pNode->rid) void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { if (term > pSyncNode->pRaftStore->currentTerm) { raftStoreSetTerm(pSyncNode->pRaftStore, term); - syncNodeBecomeFollower(pSyncNode, "update term"); + char tmpBuf[64]; + snprintf(tmpBuf, sizeof(tmpBuf), "update term to %lu", term); + syncNodeBecomeFollower(pSyncNode, tmpBuf); raftStoreClearVote(pSyncNode->pRaftStore); } } void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { - sDebug("vgId:%d sync event become follower, isStandBy:%d, replicaNum:%d, %s", pSyncNode->vgId, - pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, debugStr); + sDebug("vgId:%d sync event become follower, isStandBy:%d, currentTerm:%lu, replicaNum:%d, restoreFinish:%d, %s", + pSyncNode->vgId, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftStore->currentTerm, pSyncNode->replicaNum, + pSyncNode->restoreFinish, debugStr); // maybe clear leader cache if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { @@ -1445,8 +1490,12 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { // /\ UNCHANGED <> // void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { - sDebug("vgId:%d sync event become leader, isStandBy:%d, replicaNum:%d %s", pSyncNode->vgId, - pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, debugStr); + // reset restoreFinish + pSyncNode->restoreFinish = false; + + sDebug("vgId:%d sync event become leader, isStandBy:%d, currentTerm:%lu, replicaNum:%d, restoreFinish:%d, %s", + pSyncNode->vgId, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftStore->currentTerm, pSyncNode->replicaNum, + pSyncNode->restoreFinish, debugStr); // state change pSyncNode->state = TAOS_SYNC_STATE_LEADER; @@ -2028,11 +2077,11 @@ static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE pSyncLeaderTransfer->newLeaderId.addr); */ - sDebug("vgId:%d sync event, begin leader transfer", ths->vgId); + sDebug("vgId:%d sync event begin leader transfer", ths->vgId); if (strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 && pSyncLeaderTransfer->newNodeInfo.nodePort == ths->myNodeInfo.nodePort) { - sDebug("vgId:%d sync event, maybe leader transfer to %s:%d %lu", ths->vgId, + sDebug("vgId:%d sync event maybe leader transfer to %s:%d %lu", ths->vgId, pSyncLeaderTransfer->newNodeInfo.nodeFqdn, pSyncLeaderTransfer->newNodeInfo.nodePort, pSyncLeaderTransfer->newLeaderId.addr); @@ -2067,6 +2116,21 @@ static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE return 0; } +int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) { + for (int i = 0; i < pNewCfg->replicaNum; ++i) { + SRaftId raftId; + raftId.addr = syncUtilAddr2U64((pNewCfg->nodeInfo)[i].nodeFqdn, (pNewCfg->nodeInfo)[i].nodePort); + raftId.vgId = ths->vgId; + + if (syncUtilSameId(&(ths->myRaftId), &raftId)) { + pNewCfg->myIndex = i; + return 0; + } + } + + return -1; +} + static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) { SSyncCfg oldSyncCfg = ths->pRaftCfg->cfg; @@ -2075,19 +2139,23 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE ASSERT(ret == 0); // update new config myIndex - bool IamInNew = false; - for (int i = 0; i < newSyncCfg.replicaNum; ++i) { - if (strcmp(ths->myNodeInfo.nodeFqdn, (newSyncCfg.nodeInfo)[i].nodeFqdn) == 0 && - ths->myNodeInfo.nodePort == (newSyncCfg.nodeInfo)[i].nodePort) { - newSyncCfg.myIndex = i; - IamInNew = true; - break; - } - } + syncNodeUpdateNewConfigIndex(ths, &newSyncCfg); + + bool IamInNew = syncNodeInConfig(ths, &newSyncCfg); + + /* + for (int i = 0; i < newSyncCfg.replicaNum; ++i) { + if (strcmp(ths->myNodeInfo.nodeFqdn, (newSyncCfg.nodeInfo)[i].nodeFqdn) == 0 && + ths->myNodeInfo.nodePort == (newSyncCfg.nodeInfo)[i].nodePort) { + newSyncCfg.myIndex = i; + IamInNew = true; + break; + } + } + */ bool isDrop; - // if (IamInNew || (!IamInNew && ths->state != TAOS_SYNC_STATE_LEADER)) { if (IamInNew) { syncNodeUpdateConfig(ths, &newSyncCfg, pEntry->index, &isDrop); @@ -2186,7 +2254,8 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, ths->pFsm->FpRestoreFinishCb(ths->pFsm); } ths->restoreFinish = true; - sDebug("vgId:%d sync event restore finish, index:%ld", ths->vgId, pEntry->index); + sDebug("vgId:%d sync event restore finish, %s, index:%ld", ths->vgId, syncUtilState2String(ths->state), + pEntry->index); } } diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 996cd12e4a..79ce84abe9 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -162,9 +162,10 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr walFsync(pWal, true); - sDebug("vgId:%d sync event write index:%ld, %s, isStandBy:%d, msgType:%s, originalRpcType:%s", pData->pSyncNode->vgId, - pEntry->index, syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->pRaftCfg->isStandBy, - TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType)); + sDebug("vgId:%d sync event write index:%ld, %s, isStandBy:%d, msgType:%s,%d, originalRpcType:%s,%d", + pData->pSyncNode->vgId, pEntry->index, syncUtilState2String(pData->pSyncNode->state), + pData->pSyncNode->pRaftCfg->isStandBy, TMSG_INFO(pEntry->msgType), pEntry->msgType, + TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType); return code; } diff --git a/source/libs/sync/src/syncRespMgr.c b/source/libs/sync/src/syncRespMgr.c index 5087cacd02..adc67b5d8d 100644 --- a/source/libs/sync/src/syncRespMgr.c +++ b/source/libs/sync/src/syncRespMgr.c @@ -45,8 +45,9 @@ int64_t syncRespMgrAdd(SSyncRespMgr *pObj, SRespStub *pStub) { taosHashPut(pObj->pRespHash, &keyCode, sizeof(keyCode), pStub, sizeof(SRespStub)); SSyncNode *pSyncNode = pObj->data; - sDebug("vgId:%d sync event resp mgr add, type:%s seq:%lu handle:%p", pSyncNode->vgId, - TMSG_INFO(pStub->rpcMsg.msgType), keyCode, pStub->rpcMsg.info.handle); + sDebug("vgId:%d sync event resp mgr add, msgType:%s,%d seq:%lu handle:%p ahandle:%p", pSyncNode->vgId, + TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, keyCode, pStub->rpcMsg.info.handle, + pStub->rpcMsg.info.ahandle); taosThreadMutexUnlock(&(pObj->mutex)); return keyCode; @@ -69,8 +70,9 @@ int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub) { memcpy(pStub, pTmp, sizeof(SRespStub)); SSyncNode *pSyncNode = pObj->data; - sDebug("vgId:%d sync event resp mgr get, type:%s seq:%lu handle:%p", pSyncNode->vgId, - TMSG_INFO(pStub->rpcMsg.msgType), index, pStub->rpcMsg.info.handle); + sDebug("vgId:%d sync event resp mgr get, msgType:%s,%d seq:%lu handle:%p ahandle:%p", pSyncNode->vgId, + TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index, pStub->rpcMsg.info.handle, + pStub->rpcMsg.info.ahandle); taosThreadMutexUnlock(&(pObj->mutex)); return 1; // get one object @@ -87,8 +89,9 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStu memcpy(pStub, pTmp, sizeof(SRespStub)); SSyncNode *pSyncNode = pObj->data; - sDebug("vgId:%d sync event resp mgr get and del, type:%s seq:%lu handle:%p", pSyncNode->vgId, - TMSG_INFO(pStub->rpcMsg.msgType), index, pStub->rpcMsg.info.handle); + sDebug("vgId:%d sync event resp mgr get and del, msgType:%s,%d seq:%lu handle:%p ahandle:%p", pSyncNode->vgId, + TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index, pStub->rpcMsg.info.handle, + pStub->rpcMsg.info.ahandle); taosHashRemove(pObj->pRespHash, &index, sizeof(index)); taosThreadMutexUnlock(&(pObj->mutex)); diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index c92920a936..545ea953f8 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -591,6 +591,12 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { if (pMsg->lastConfigIndex >= SYNC_INDEX_BEGIN) { int32_t oldReplicaNum = pSyncNode->replicaNum; + // update new config myIndex + SSyncCfg newSyncCfg = pMsg->lastConfig; + syncNodeUpdateNewConfigIndex(pSyncNode, &newSyncCfg); + bool IamInNew = syncNodeInConfig(pSyncNode, &newSyncCfg); + +#if 0 // update new config myIndex bool IamInNew = false; SSyncCfg newSyncCfg = pMsg->lastConfig; @@ -602,6 +608,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { break; } } +#endif bool isDrop; if (IamInNew) { diff --git a/source/libs/sync/test/syncTestTool.cpp b/source/libs/sync/test/syncTestTool.cpp index 21049454f4..1eac372bd4 100644 --- a/source/libs/sync/test/syncTestTool.cpp +++ b/source/libs/sync/test/syncTestTool.cpp @@ -297,7 +297,7 @@ void usage(char* exe) { SRpcMsg* createRpcMsg(int i, int count, int myIndex) { SRpcMsg* pMsg = (SRpcMsg*)taosMemoryMalloc(sizeof(SRpcMsg)); memset(pMsg, 0, sizeof(SRpcMsg)); - pMsg->msgType = 9999; + pMsg->msgType = TDMT_VND_SUBMIT; pMsg->contLen = 256; pMsg->pCont = rpcMallocCont(pMsg->contLen); snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-%ld", myIndex, i, count, taosGetTimestampMs()); @@ -384,8 +384,10 @@ int main(int argc, char** argv) { leaderTransferWait++; if (leaderTransferWait == 7) { - sTrace("begin leader transfer ..."); - int32_t ret = syncLeaderTransfer(rid); + if (leaderTransfer) { + sTrace("begin leader transfer ..."); + int32_t ret = syncLeaderTransfer(rid); + } } if (alreadySend < writeRecordNum) { -- GitLab