diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 2673bc382be3eb0ed6a3d82a9560cb3fe78e2d90..a369b81d2643f4a560ffb50a55d760aaa7bf2c53 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -111,6 +111,7 @@ typedef struct SSyncFSM { void (*FpRestoreFinishCb)(struct SSyncFSM* pFsm); void (*FpReConfigCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta); + void (*FpLeaderTransferCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot); diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index 68a33d48cb585d2176ee8a3aa8ea0e914d90079d..fbf124bf4748e716b28cd4e449f7cfdc5f975a84 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -467,6 +467,7 @@ typedef struct SyncLeaderTransfer { SRaftId srcId; SRaftId destId; */ + SNodeInfo newNodeInfo; SRaftId newLeaderId; } SyncLeaderTransfer; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 82fde80be22e0ba49480222e872686119220437b..904cdae94eee9acff464aba17fdcb2027e623c71 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -35,7 +35,7 @@ #include "syncVoteMgr.h" #include "tref.h" -bool gRaftDetailLog = true; +bool gRaftDetailLog = false; static int32_t tsNodeRefId = -1; @@ -183,13 +183,21 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg int32_t ret = 0; bool IamInNew = false; for (int i = 0; i < pNewCfg->replicaNum; ++i) { - SRaftId newId; - newId.addr = syncUtilAddr2U64((pNewCfg->nodeInfo)[i].nodeFqdn, (pNewCfg->nodeInfo)[i].nodePort); - newId.vgId = pSyncNode->vgId; - if (syncUtilSameId(&(pSyncNode->myRaftId), &newId)) { + if (strcmp((pNewCfg->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 && + (pNewCfg->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) { IamInNew = true; } + + /* + SRaftId newId; + newId.addr = syncUtilAddr2U64((pNewCfg->nodeInfo)[i].nodeFqdn, (pNewCfg->nodeInfo)[i].nodePort); + newId.vgId = pSyncNode->vgId; + if (syncUtilSameId(&(pSyncNode->myRaftId), &newId)) { + IamInNew = true; + } + */ } + if (!IamInNew) { taosReleaseRef(tsNodeRefId, pSyncNode->rid); return TAOS_SYNC_NOT_IN_NEW_CONFIG; @@ -267,7 +275,7 @@ int32_t syncLeaderTransfer(int64_t rid) { } ASSERT(rid == pSyncNode->rid); - if (pSyncNode->peersNum > 0) { + if (pSyncNode->peersNum == 0) { taosReleaseRef(tsNodeRefId, pSyncNode->rid); return TAOS_SYNC_OTHER_ERROR; } @@ -296,6 +304,7 @@ int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) { SyncLeaderTransfer* pMsg = syncLeaderTransferBuild(pSyncNode->vgId); pMsg->newLeaderId.addr = syncUtilAddr2U64(newLeader.nodeFqdn, newLeader.nodePort); pMsg->newLeaderId.vgId = pSyncNode->vgId; + pMsg->newNodeInfo = newLeader; ASSERT(pMsg != NULL); SRpcMsg rpcMsg = {0}; syncLeaderTransfer2RpcMsg(pMsg, &rpcMsg); @@ -1887,10 +1896,51 @@ const char* syncStr(ESyncState state) { } static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) { - SyncLeaderTransfer* pSyncLeaderTransfer; + SyncLeaderTransfer* pSyncLeaderTransfer = syncLeaderTransferFromRpcMsg2(pRpcMsg); + +/* + char host[128]; + uint16_t port; + syncUtilU642Addr(pSyncLeaderTransfer->newLeaderId.addr, host, sizeof(host), &port); + sDebug("vgId:%d sync event, maybe leader transfer to %s:%d %lu", ths->vgId, host, port, + pSyncLeaderTransfer->newLeaderId.addr); +*/ + + sDebug("vgId:%d sync event, begin leader transfer", ths->vgId); + + if (strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 && pSyncLeaderTransfer->newNodeInfo.nodePort == ths->myNodeInfo.nodePort) { + sDebug("vgId:%d sync event, maybe leader transfer to %s:%d %lu", ths->vgId, pSyncLeaderTransfer->newNodeInfo.nodeFqdn, pSyncLeaderTransfer->newNodeInfo.nodePort, + pSyncLeaderTransfer->newLeaderId.addr); + + // reset elect timer now! + int32_t electMS = 1; + int32_t ret = syncNodeRestartElectTimer(ths, electMS); + ASSERT(ret == 0); + } + + +/* if (syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId))) { + // reset elect timer now! + int32_t electMS = 1; + int32_t ret = syncNodeRestartElectTimer(ths, electMS); + ASSERT(ret == 0); + } +*/ + if (ths->pFsm->FpLeaderTransferCb != NULL) { + SFsmCbMeta cbMeta; + cbMeta.code = 0; + cbMeta.currentTerm = ths->pRaftStore->currentTerm; + cbMeta.flag = 0; + cbMeta.index = pEntry->index; + cbMeta.isWeak = pEntry->isWeak; + cbMeta.seqNum = pEntry->seqNum; + cbMeta.state = ths->state; + cbMeta.term = pEntry->term; + ths->pFsm->FpLeaderTransferCb(ths->pFsm, pRpcMsg, cbMeta); } + syncLeaderTransferDestroy(pSyncLeaderTransfer); return 0; } @@ -1992,7 +2042,7 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, ASSERT(code == 0); } - // config change + // leader transfer if (pEntry->originalRpcType == TDMT_SYNC_LEADER_TRANSFER) { code = syncDoLeaderTransfer(ths, &rpcMsg, pEntry); ASSERT(code == 0); diff --git a/source/libs/sync/test/syncTestTool.cpp b/source/libs/sync/test/syncTestTool.cpp index 0c8b26e9d9f2fa793acc6c103275a6c5737f503e..ebcd7368cc430b38a62a051edff687c85e9ea3e0 100644 --- a/source/libs/sync/test/syncTestTool.cpp +++ b/source/libs/sync/test/syncTestTool.cpp @@ -153,6 +153,16 @@ void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMe taosMemoryFree(s); } +void LeaderTransferCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { + char logBuf[256] = {0}; + snprintf(logBuf, sizeof(logBuf), + "==callback== ==LeaderTransferCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, flag:%lu, term:%lu " + "currentTerm:%lu \n", + pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), + cbMeta.flag, cbMeta.term, cbMeta.currentTerm); + syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); +} + SSyncFSM* createFsm() { SSyncFSM* pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM)); memset(pFsm, 0, sizeof(*pFsm)); @@ -172,6 +182,8 @@ SSyncFSM* createFsm() { pFsm->FpSnapshotStopWrite = SnapshotStopWrite; pFsm->FpSnapshotDoWrite = SnapshotDoWrite; + pFsm->FpLeaderTransferCb = LeaderTransferCb; + return pFsm; } @@ -277,7 +289,8 @@ void usage(char* exe) { printf( "usage: %s replicaNum(1-5) myIndex(0-..) enableSnapshot(0/1) lastApplyIndex(>=-1) lastApplyTerm(>=0) " "writeRecordNum(>=0) " - "isStandBy(0/1) isConfigChange(0-5) iterTimes(>=0) finishLastApplyIndex(>=-1) finishLastApplyTerm(>=0) \n", + "isStandBy(0/1) isConfigChange(0-5) iterTimes(>=0) finishLastApplyIndex(>=-1) finishLastApplyTerm(>=0) " + "leaderTransfer(0/1) \n", exe); } @@ -294,9 +307,9 @@ SRpcMsg* createRpcMsg(int i, int count, int myIndex) { int main(int argc, char** argv) { sprintf(tsTempDir, "%s", "."); tsAsyncLog = 0; - sDebugFlag = DEBUG_SCREEN + DEBUG_FILE + DEBUG_TRACE + DEBUG_INFO + DEBUG_ERROR; + sDebugFlag = DEBUG_SCREEN + DEBUG_FILE + DEBUG_TRACE + DEBUG_INFO + DEBUG_ERROR + DEBUG_DEBUG; - if (argc != 12) { + if (argc != 13) { usage(argv[0]); exit(-1); } @@ -312,12 +325,14 @@ int main(int argc, char** argv) { int32_t iterTimes = atoi(argv[9]); int32_t finishLastApplyIndex = atoi(argv[10]); int32_t finishLastApplyTerm = atoi(argv[11]); + int32_t leaderTransfer = atoi(argv[12]); - sTrace( + sInfo( "args: replicaNum:%d, myIndex:%d, enableSnapshot:%d, lastApplyIndex:%d, lastApplyTerm:%d, writeRecordNum:%d, " - "isStandBy:%d, isConfigChange:%d, iterTimes:%d, finishLastApplyIndex:%d, finishLastApplyTerm:%d", + "isStandBy:%d, isConfigChange:%d, iterTimes:%d, finishLastApplyIndex:%d, finishLastApplyTerm:%d, " + "leaderTransfer:%d", replicaNum, myIndex, enableSnapshot, lastApplyIndex, lastApplyTerm, writeRecordNum, isStandBy, isConfigChange, - iterTimes, finishLastApplyIndex, finishLastApplyTerm); + iterTimes, finishLastApplyIndex, finishLastApplyTerm, leaderTransfer); // check parameter assert(replicaNum >= 1 && replicaNum <= 5); @@ -363,24 +378,31 @@ int main(int argc, char** argv) { //--------------------------- int32_t alreadySend = 0; + int32_t leaderTransferWait = 0; while (1) { char* simpleStr = syncNode2SimpleStr(pSyncNode); + leaderTransferWait++; + if (leaderTransferWait == 7) { + sTrace("begin leader transfer ..."); + int32_t ret = syncLeaderTransfer(rid); + } + if (alreadySend < writeRecordNum) { SRpcMsg* pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex); int32_t ret = syncPropose(rid, pRpcMsg, false); if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) { - sTrace("%s value%d write not leader", simpleStr, alreadySend); + sTrace("%s value%d write not leader, leaderTransferWait:%d", simpleStr, alreadySend, leaderTransferWait); } else { assert(ret == 0); - sTrace("%s value%d write ok", simpleStr, alreadySend); + sTrace("%s value%d write ok, leaderTransferWait:%d", simpleStr, alreadySend, leaderTransferWait); } alreadySend++; rpcFreeCont(pRpcMsg->pCont); taosMemoryFree(pRpcMsg); } else { - sTrace("%s", simpleStr); + sTrace("%s, leaderTransferWait:%d", simpleStr, leaderTransferWait); } taosMsleep(1000);