From d7d8d82eecd8dc3eaae6656b5df0b0a667efa4a2 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 12 Jul 2022 11:08:01 +0800 Subject: [PATCH] refactor(sync): modify log to index --- source/libs/sync/src/syncMain.c | 46 +++++++++++++++++++++++++++++---- 1 file changed, 41 insertions(+), 5 deletions(-) diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 50e2588e19..ff3e554c8d 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -316,6 +316,40 @@ int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) { return ret; } +int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) { + if (pSyncNode->peersNum == 0) { + sError("only one replica, cannot leader transfer"); + terrno = TSDB_CODE_SYN_ONE_REPLICA; + return -1; + } + + SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0]; + int32_t ret = syncNodeLeaderTransferTo(pSyncNode, newLeader); + return ret; +} + +int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) { + int32_t ret = 0; + + if (pSyncNode->replicaNum == 1) { + sError("only one replica, cannot leader transfer"); + terrno = TSDB_CODE_SYN_ONE_REPLICA; + return -1; + } + + SyncLeaderTransfer* pMsg = syncLeaderTransferBuild(pSyncNode->vgId); + pMsg->newLeaderId.addr = syncUtilAddr2U64(newLeader.nodeFqdn, newLeader.nodePort); + pMsg->newLeaderId.vgId = pSyncNode->vgId; + pMsg->newNodeInfo = newLeader; + ASSERT(pMsg != NULL); + SRpcMsg rpcMsg = {0}; + syncLeaderTransfer2RpcMsg(pMsg, &rpcMsg); + syncLeaderTransferDestroy(pMsg); + + ret = syncNodePropose(pSyncNode, &rpcMsg, false); + return ret; +} + bool syncCanLeaderTransfer(int64_t rid) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { @@ -1113,6 +1147,8 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) { void syncNodeClose(SSyncNode* pSyncNode) { syncNodeEventLog(pSyncNode, "sync close"); + // leader transfer + int32_t ret; ASSERT(pSyncNode != NULL); @@ -1527,7 +1563,7 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { char logBuf[256 + 256]; if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) { snprintf(logBuf, sizeof(logBuf), - "vgId:%d, sync %s %s, term:%" PRIu64 ", commit:%" PRId64 ", beginlog:%" PRId64 ", lastlog:%" PRId64 + "vgId:%d, sync %s %s, term:%" PRIu64 ", commit:%" PRId64 ", fristindex:%" PRId64 ", lastindex:%" PRId64 ", lastsnapshot:%" PRId64 ", standby:%d, " "strategy:%d, batch:%d, " @@ -1548,7 +1584,7 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { char* s = (char*)taosMemoryMalloc(len); if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) { snprintf(s, len, - "vgId:%d, sync %s %s, term:%" PRIu64 ", commit:%" PRId64 ", beginlog:%" PRId64 ", lastlog:%" PRId64 + "vgId:%d, sync %s %s, term:%" PRIu64 ", commit:%" PRId64 ", fristindex:%" PRId64 ", lastindex:%" PRId64 ", lastsnapshot:%" PRId64 ", standby:%d, " "strategy:%d, batch:%d, " @@ -1594,7 +1630,7 @@ void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) { char logBuf[256 + 256]; if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) { snprintf(logBuf, sizeof(logBuf), - "vgId:%d, sync %s %s, term:%" PRIu64 ", commit:%" PRId64 ", beginlog:%" PRId64 ", lastlog:%" PRId64 + "vgId:%d, sync %s %s, term:%" PRIu64 ", commit:%" PRId64 ", fristindex:%" PRId64 ", lastindex:%" PRId64 ", lastsnapshot:%" PRId64 ", standby:%d, " "replica-num:%d, " @@ -1613,7 +1649,7 @@ void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) { char* s = (char*)taosMemoryMalloc(len); if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) { snprintf(s, len, - "vgId:%d, sync %s %s, term:%" PRIu64 ", commit:%" PRId64 ", beginlog:%" PRId64 ", lastlog:%" PRId64 + "vgId:%d, sync %s %s, term:%" PRIu64 ", commit:%" PRId64 ", fristindex:%" PRId64 ", lastindex:%" PRId64 ", lastsnapshot:%" PRId64 ", standby:%d, " "replica-num:%d, " @@ -1644,7 +1680,7 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) { SyncIndex logBeginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore); snprintf(s, len, - "vgId:%d, sync %s, term:%" PRIu64 ", commit:%" PRId64 ", beginlog:%" PRId64 ", lastlog:%" PRId64 + "vgId:%d, sync %s, term:%" PRIu64 ", commit:%" PRId64 ", fristindex:%" PRId64 ", lastindex:%" PRId64 ", lastsnapshot:%" PRId64 ", standby:%d, " "replica-num:%d, " -- GitLab