提交 f47a28fe 编写于 作者: M Minghao Li

enh(sync): add leader transfer

上级 b1093b4a
...@@ -111,6 +111,7 @@ typedef struct SSyncFSM { ...@@ -111,6 +111,7 @@ typedef struct SSyncFSM {
void (*FpRestoreFinishCb)(struct SSyncFSM* pFsm); void (*FpRestoreFinishCb)(struct SSyncFSM* pFsm);
void (*FpReConfigCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta); void (*FpReConfigCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta);
void (*FpLeaderTransferCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot); int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
......
...@@ -467,6 +467,7 @@ typedef struct SyncLeaderTransfer { ...@@ -467,6 +467,7 @@ typedef struct SyncLeaderTransfer {
SRaftId srcId; SRaftId srcId;
SRaftId destId; SRaftId destId;
*/ */
SNodeInfo newNodeInfo;
SRaftId newLeaderId; SRaftId newLeaderId;
} SyncLeaderTransfer; } SyncLeaderTransfer;
......
...@@ -35,7 +35,7 @@ ...@@ -35,7 +35,7 @@
#include "syncVoteMgr.h" #include "syncVoteMgr.h"
#include "tref.h" #include "tref.h"
bool gRaftDetailLog = true; bool gRaftDetailLog = false;
static int32_t tsNodeRefId = -1; static int32_t tsNodeRefId = -1;
...@@ -183,13 +183,21 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg ...@@ -183,13 +183,21 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg
int32_t ret = 0; int32_t ret = 0;
bool IamInNew = false; bool IamInNew = false;
for (int i = 0; i < pNewCfg->replicaNum; ++i) { for (int i = 0; i < pNewCfg->replicaNum; ++i) {
if (strcmp((pNewCfg->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
(pNewCfg->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
IamInNew = true;
}
/*
SRaftId newId; SRaftId newId;
newId.addr = syncUtilAddr2U64((pNewCfg->nodeInfo)[i].nodeFqdn, (pNewCfg->nodeInfo)[i].nodePort); newId.addr = syncUtilAddr2U64((pNewCfg->nodeInfo)[i].nodeFqdn, (pNewCfg->nodeInfo)[i].nodePort);
newId.vgId = pSyncNode->vgId; newId.vgId = pSyncNode->vgId;
if (syncUtilSameId(&(pSyncNode->myRaftId), &newId)) { if (syncUtilSameId(&(pSyncNode->myRaftId), &newId)) {
IamInNew = true; IamInNew = true;
} }
*/
} }
if (!IamInNew) { if (!IamInNew) {
taosReleaseRef(tsNodeRefId, pSyncNode->rid); taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return TAOS_SYNC_NOT_IN_NEW_CONFIG; return TAOS_SYNC_NOT_IN_NEW_CONFIG;
...@@ -267,7 +275,7 @@ int32_t syncLeaderTransfer(int64_t rid) { ...@@ -267,7 +275,7 @@ int32_t syncLeaderTransfer(int64_t rid) {
} }
ASSERT(rid == pSyncNode->rid); ASSERT(rid == pSyncNode->rid);
if (pSyncNode->peersNum > 0) { if (pSyncNode->peersNum == 0) {
taosReleaseRef(tsNodeRefId, pSyncNode->rid); taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return TAOS_SYNC_OTHER_ERROR; return TAOS_SYNC_OTHER_ERROR;
} }
...@@ -296,6 +304,7 @@ int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) { ...@@ -296,6 +304,7 @@ int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) {
SyncLeaderTransfer* pMsg = syncLeaderTransferBuild(pSyncNode->vgId); SyncLeaderTransfer* pMsg = syncLeaderTransferBuild(pSyncNode->vgId);
pMsg->newLeaderId.addr = syncUtilAddr2U64(newLeader.nodeFqdn, newLeader.nodePort); pMsg->newLeaderId.addr = syncUtilAddr2U64(newLeader.nodeFqdn, newLeader.nodePort);
pMsg->newLeaderId.vgId = pSyncNode->vgId; pMsg->newLeaderId.vgId = pSyncNode->vgId;
pMsg->newNodeInfo = newLeader;
ASSERT(pMsg != NULL); ASSERT(pMsg != NULL);
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
syncLeaderTransfer2RpcMsg(pMsg, &rpcMsg); syncLeaderTransfer2RpcMsg(pMsg, &rpcMsg);
...@@ -1887,10 +1896,51 @@ const char* syncStr(ESyncState state) { ...@@ -1887,10 +1896,51 @@ const char* syncStr(ESyncState state) {
} }
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) { static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
SyncLeaderTransfer* pSyncLeaderTransfer; SyncLeaderTransfer* pSyncLeaderTransfer = syncLeaderTransferFromRpcMsg2(pRpcMsg);
/*
char host[128];
uint16_t port;
syncUtilU642Addr(pSyncLeaderTransfer->newLeaderId.addr, host, sizeof(host), &port);
sDebug("vgId:%d sync event, maybe leader transfer to %s:%d %lu", ths->vgId, host, port,
pSyncLeaderTransfer->newLeaderId.addr);
*/
sDebug("vgId:%d sync event, begin leader transfer", ths->vgId);
if (strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 && pSyncLeaderTransfer->newNodeInfo.nodePort == ths->myNodeInfo.nodePort) {
sDebug("vgId:%d sync event, maybe leader transfer to %s:%d %lu", ths->vgId, pSyncLeaderTransfer->newNodeInfo.nodeFqdn, pSyncLeaderTransfer->newNodeInfo.nodePort,
pSyncLeaderTransfer->newLeaderId.addr);
// reset elect timer now!
int32_t electMS = 1;
int32_t ret = syncNodeRestartElectTimer(ths, electMS);
ASSERT(ret == 0);
}
/*
if (syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId))) { if (syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId))) {
// reset elect timer now!
int32_t electMS = 1;
int32_t ret = syncNodeRestartElectTimer(ths, electMS);
ASSERT(ret == 0);
}
*/
if (ths->pFsm->FpLeaderTransferCb != NULL) {
SFsmCbMeta cbMeta;
cbMeta.code = 0;
cbMeta.currentTerm = ths->pRaftStore->currentTerm;
cbMeta.flag = 0;
cbMeta.index = pEntry->index;
cbMeta.isWeak = pEntry->isWeak;
cbMeta.seqNum = pEntry->seqNum;
cbMeta.state = ths->state;
cbMeta.term = pEntry->term;
ths->pFsm->FpLeaderTransferCb(ths->pFsm, pRpcMsg, cbMeta);
} }
syncLeaderTransferDestroy(pSyncLeaderTransfer);
return 0; return 0;
} }
...@@ -1992,7 +2042,7 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, ...@@ -1992,7 +2042,7 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
ASSERT(code == 0); ASSERT(code == 0);
} }
// config change // leader transfer
if (pEntry->originalRpcType == TDMT_SYNC_LEADER_TRANSFER) { if (pEntry->originalRpcType == TDMT_SYNC_LEADER_TRANSFER) {
code = syncDoLeaderTransfer(ths, &rpcMsg, pEntry); code = syncDoLeaderTransfer(ths, &rpcMsg, pEntry);
ASSERT(code == 0); ASSERT(code == 0);
......
...@@ -153,6 +153,16 @@ void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMe ...@@ -153,6 +153,16 @@ void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMe
taosMemoryFree(s); taosMemoryFree(s);
} }
void LeaderTransferCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
char logBuf[256] = {0};
snprintf(logBuf, sizeof(logBuf),
"==callback== ==LeaderTransferCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, flag:%lu, term:%lu "
"currentTerm:%lu \n",
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state),
cbMeta.flag, cbMeta.term, cbMeta.currentTerm);
syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg);
}
SSyncFSM* createFsm() { SSyncFSM* createFsm() {
SSyncFSM* pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM)); SSyncFSM* pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM));
memset(pFsm, 0, sizeof(*pFsm)); memset(pFsm, 0, sizeof(*pFsm));
...@@ -172,6 +182,8 @@ SSyncFSM* createFsm() { ...@@ -172,6 +182,8 @@ SSyncFSM* createFsm() {
pFsm->FpSnapshotStopWrite = SnapshotStopWrite; pFsm->FpSnapshotStopWrite = SnapshotStopWrite;
pFsm->FpSnapshotDoWrite = SnapshotDoWrite; pFsm->FpSnapshotDoWrite = SnapshotDoWrite;
pFsm->FpLeaderTransferCb = LeaderTransferCb;
return pFsm; return pFsm;
} }
...@@ -277,7 +289,8 @@ void usage(char* exe) { ...@@ -277,7 +289,8 @@ void usage(char* exe) {
printf( printf(
"usage: %s replicaNum(1-5) myIndex(0-..) enableSnapshot(0/1) lastApplyIndex(>=-1) lastApplyTerm(>=0) " "usage: %s replicaNum(1-5) myIndex(0-..) enableSnapshot(0/1) lastApplyIndex(>=-1) lastApplyTerm(>=0) "
"writeRecordNum(>=0) " "writeRecordNum(>=0) "
"isStandBy(0/1) isConfigChange(0-5) iterTimes(>=0) finishLastApplyIndex(>=-1) finishLastApplyTerm(>=0) \n", "isStandBy(0/1) isConfigChange(0-5) iterTimes(>=0) finishLastApplyIndex(>=-1) finishLastApplyTerm(>=0) "
"leaderTransfer(0/1) \n",
exe); exe);
} }
...@@ -294,9 +307,9 @@ SRpcMsg* createRpcMsg(int i, int count, int myIndex) { ...@@ -294,9 +307,9 @@ SRpcMsg* createRpcMsg(int i, int count, int myIndex) {
int main(int argc, char** argv) { int main(int argc, char** argv) {
sprintf(tsTempDir, "%s", "."); sprintf(tsTempDir, "%s", ".");
tsAsyncLog = 0; tsAsyncLog = 0;
sDebugFlag = DEBUG_SCREEN + DEBUG_FILE + DEBUG_TRACE + DEBUG_INFO + DEBUG_ERROR; sDebugFlag = DEBUG_SCREEN + DEBUG_FILE + DEBUG_TRACE + DEBUG_INFO + DEBUG_ERROR + DEBUG_DEBUG;
if (argc != 12) { if (argc != 13) {
usage(argv[0]); usage(argv[0]);
exit(-1); exit(-1);
} }
...@@ -312,12 +325,14 @@ int main(int argc, char** argv) { ...@@ -312,12 +325,14 @@ int main(int argc, char** argv) {
int32_t iterTimes = atoi(argv[9]); int32_t iterTimes = atoi(argv[9]);
int32_t finishLastApplyIndex = atoi(argv[10]); int32_t finishLastApplyIndex = atoi(argv[10]);
int32_t finishLastApplyTerm = atoi(argv[11]); int32_t finishLastApplyTerm = atoi(argv[11]);
int32_t leaderTransfer = atoi(argv[12]);
sTrace( sInfo(
"args: replicaNum:%d, myIndex:%d, enableSnapshot:%d, lastApplyIndex:%d, lastApplyTerm:%d, writeRecordNum:%d, " "args: replicaNum:%d, myIndex:%d, enableSnapshot:%d, lastApplyIndex:%d, lastApplyTerm:%d, writeRecordNum:%d, "
"isStandBy:%d, isConfigChange:%d, iterTimes:%d, finishLastApplyIndex:%d, finishLastApplyTerm:%d", "isStandBy:%d, isConfigChange:%d, iterTimes:%d, finishLastApplyIndex:%d, finishLastApplyTerm:%d, "
"leaderTransfer:%d",
replicaNum, myIndex, enableSnapshot, lastApplyIndex, lastApplyTerm, writeRecordNum, isStandBy, isConfigChange, replicaNum, myIndex, enableSnapshot, lastApplyIndex, lastApplyTerm, writeRecordNum, isStandBy, isConfigChange,
iterTimes, finishLastApplyIndex, finishLastApplyTerm); iterTimes, finishLastApplyIndex, finishLastApplyTerm, leaderTransfer);
// check parameter // check parameter
assert(replicaNum >= 1 && replicaNum <= 5); assert(replicaNum >= 1 && replicaNum <= 5);
...@@ -363,24 +378,31 @@ int main(int argc, char** argv) { ...@@ -363,24 +378,31 @@ int main(int argc, char** argv) {
//--------------------------- //---------------------------
int32_t alreadySend = 0; int32_t alreadySend = 0;
int32_t leaderTransferWait = 0;
while (1) { while (1) {
char* simpleStr = syncNode2SimpleStr(pSyncNode); char* simpleStr = syncNode2SimpleStr(pSyncNode);
leaderTransferWait++;
if (leaderTransferWait == 7) {
sTrace("begin leader transfer ...");
int32_t ret = syncLeaderTransfer(rid);
}
if (alreadySend < writeRecordNum) { if (alreadySend < writeRecordNum) {
SRpcMsg* pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex); SRpcMsg* pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex);
int32_t ret = syncPropose(rid, pRpcMsg, false); int32_t ret = syncPropose(rid, pRpcMsg, false);
if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) { if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) {
sTrace("%s value%d write not leader", simpleStr, alreadySend); sTrace("%s value%d write not leader, leaderTransferWait:%d", simpleStr, alreadySend, leaderTransferWait);
} else { } else {
assert(ret == 0); assert(ret == 0);
sTrace("%s value%d write ok", simpleStr, alreadySend); sTrace("%s value%d write ok, leaderTransferWait:%d", simpleStr, alreadySend, leaderTransferWait);
} }
alreadySend++; alreadySend++;
rpcFreeCont(pRpcMsg->pCont); rpcFreeCont(pRpcMsg->pCont);
taosMemoryFree(pRpcMsg); taosMemoryFree(pRpcMsg);
} else { } else {
sTrace("%s", simpleStr); sTrace("%s, leaderTransferWait:%d", simpleStr, leaderTransferWait);
} }
taosMsleep(1000); taosMsleep(1000);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册