From e79e50ae364da43ec07820e61dfb3d5a130146eb Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 1 Nov 2022 11:45:58 +0800 Subject: [PATCH] enh: refact syncMsg code --- include/dnode/mnode/mnode.h | 1 - include/libs/sync/sync.h | 12 +- source/dnode/mgmt/mgmt_mnode/src/mmWorker.c | 20 +-- source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 18 +- source/dnode/mnode/impl/src/mndMain.c | 120 +------------ source/dnode/vnode/inc/vnode.h | 1 - source/dnode/vnode/src/vnd/vnodeSync.c | 134 +------------- source/libs/sync/inc/syncInt.h | 1 - source/libs/sync/src/syncEnv.c | 1 + source/libs/sync/src/syncMain.c | 190 +++++++++----------- 10 files changed, 102 insertions(+), 396 deletions(-) diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index c9e47c25b7..cdb1642a5c 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -99,7 +99,6 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad); */ int32_t mndProcessRpcMsg(SRpcMsg *pMsg); int32_t mndProcessSyncMsg(SRpcMsg *pMsg); -int32_t mndProcessSyncCtrlMsg(SRpcMsg *pMsg); int32_t mndPreProcessQueryMsg(SRpcMsg *pMsg); void mndPostProcessQueryMsg(SRpcMsg *pMsg); diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 1a94dcf426..fc926cb1b4 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -220,21 +220,13 @@ const char* syncStr(ESyncState state); bool syncIsRestoreFinish(int64_t rid); int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapshot); -int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg); - -// build SRpcMsg, need to call syncPropose with SRpcMsg -int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg); - +int32_t syncReconfig(int64_t rid, SSyncCfg* pCfg); int32_t syncLeaderTransfer(int64_t rid); -int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader); - int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex); int32_t syncEndSnapshot(int64_t rid); - int32_t syncStepDown(int64_t rid, SyncTerm newTerm); -SSyncNode* syncNodeAcquire(int64_t rid); -void syncNodeRelease(SSyncNode* pNode); +int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index e0a39a6bf1..e50b527bac 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -67,24 +67,6 @@ static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) { taosFreeQitem(pMsg); } -static void mmProcessSyncCtrlMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) { - SMnodeMgmt *pMgmt = pInfo->ahandle; - pMsg->info.node = pMgmt->pMnode; - - const STraceId *trace = &pMsg->info.traceId; - dGTrace("msg:%p, get from mnode-sync-ctrl queue", pMsg); - - SMsgHead *pHead = pMsg->pCont; - pHead->contLen = ntohl(pHead->contLen); - pHead->vgId = ntohl(pHead->vgId); - - int32_t code = mndProcessSyncCtrlMsg(pMsg); - - dGTrace("msg:%p, is freed, code:0x%x", pMsg, code); - rpcFreeCont(pMsg->pCont); - taosFreeQitem(pMsg); -} - static void mmProcessSyncMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) { SMnodeMgmt *pMgmt = pInfo->ahandle; pMsg->info.node = pMgmt->pMnode; @@ -252,7 +234,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { .min = 1, .max = 1, .name = "mnode-sync-ctrl", - .fp = (FItem)mmProcessSyncCtrlMsg, + .fp = (FItem)mmProcessSyncMsg, .param = pMgmt, }; if (tSingleWorkerInit(&pMgmt->syncCtrlWorker, &scCfg) != 0) { diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index d5b1e18c7b..71e2ece733 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -133,22 +133,6 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf } } -static void vmProcessSyncCtrlQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { - SVnodeObj *pVnode = pInfo->ahandle; - SRpcMsg *pMsg = NULL; - - for (int32_t i = 0; i < numOfMsgs; ++i) { - if (taosGetQitem(qall, (void **)&pMsg) == 0) continue; - const STraceId *trace = &pMsg->info.traceId; - dGTrace("vgId:%d, msg:%p get from vnode-sync queue", pVnode->vgId, pMsg); - - int32_t code = vnodeProcessSyncCtrlMsg(pVnode->pImpl, pMsg, NULL); // no response here - dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code); - rpcFreeCont(pMsg->pCont); - taosFreeQitem(pMsg); - } -} - static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtype) { const STraceId *trace = &pMsg->info.traceId; SMsgHead *pHead = pMsg->pCont; @@ -317,7 +301,7 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) { int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode->pImpl, (FItems)vnodeProposeWriteMsg); pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue); - pVnode->pSyncCtrlQ = tWWorkerAllocQueue(&pMgmt->syncCtrlPool, pVnode, (FItems)vmProcessSyncCtrlQueue); + pVnode->pSyncCtrlQ = tWWorkerAllocQueue(&pMgmt->syncCtrlPool, pVnode, (FItems)vmProcessSyncQueue); pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->applyPool, pVnode->pImpl, (FItems)vnodeApplyWriteMsg); pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue); pVnode->pStreamQ = tQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue); diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 53e7b1cd26..891f2bbcd8 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -474,128 +474,18 @@ void mndStop(SMnode *pMnode) { mndCleanupTimer(pMnode); } -int32_t mndProcessSyncCtrlMsg(SRpcMsg *pMsg) { - SMnode *pMnode = pMsg->info.node; - SSyncMgmt *pMgmt = &pMnode->syncMgmt; - int32_t code = 0; - - mInfo("vgId:%d, process sync ctrl msg", 1); - - if (!syncIsInit()) { - mError("failed to process sync msg:%p type:%s since syncEnv stop", pMsg, TMSG_INFO(pMsg->msgType)); - terrno = TSDB_CODE_SYN_INTERNAL_ERROR; - return -1; - } - - SSyncNode *pSyncNode = syncNodeAcquire(pMgmt->sync); - if (pSyncNode == NULL) { - mError("failed to process sync msg:%p type:%s since syncNode is null", pMsg, TMSG_INFO(pMsg->msgType)); - terrno = TSDB_CODE_SYN_INTERNAL_ERROR; - return -1; - } - - if (pMsg->msgType == TDMT_SYNC_HEARTBEAT) { - SyncHeartbeat *pSyncMsg = syncHeartbeatFromRpcMsg2(pMsg); - code = syncNodeOnHeartbeat(pSyncNode, pSyncMsg); - syncHeartbeatDestroy(pSyncMsg); - - } else if (pMsg->msgType == TDMT_SYNC_HEARTBEAT_REPLY) { - SyncHeartbeatReply *pSyncMsg = syncHeartbeatReplyFromRpcMsg2(pMsg); - code = syncNodeOnHeartbeatReply(pSyncNode, pSyncMsg); - syncHeartbeatReplyDestroy(pSyncMsg); - } - - syncNodeRelease(pSyncNode); - - if (code != 0) { - terrno = TSDB_CODE_SYN_INTERNAL_ERROR; - } - return code; -} - int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; SSyncMgmt *pMgmt = &pMnode->syncMgmt; - int32_t code = 0; - - if (!syncIsInit()) { - mError("failed to process sync msg:%p type:%s since syncEnv stop", pMsg, TMSG_INFO(pMsg->msgType)); - terrno = TSDB_CODE_SYN_INTERNAL_ERROR; - return -1; - } - - SSyncNode *pSyncNode = syncNodeAcquire(pMgmt->sync); - if (pSyncNode == NULL) { - mError("failed to process sync msg:%p type:%s since syncNode is null", pMsg, TMSG_INFO(pMsg->msgType)); - terrno = TSDB_CODE_SYN_INTERNAL_ERROR; - return -1; - } - - if (pMsg->msgType == TDMT_SYNC_TIMEOUT) { - SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg); - code = syncNodeOnTimer(pSyncNode, pSyncMsg); - syncTimeoutDestroy(pSyncMsg); - - } else if (pMsg->msgType == TDMT_SYNC_PING) { - SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg); - code = syncNodeOnPing(pSyncNode, pSyncMsg); - syncPingDestroy(pSyncMsg); - - } else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) { - SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg); - code = syncNodeOnPingReply(pSyncNode, pSyncMsg); - syncPingReplyDestroy(pSyncMsg); - - } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { - SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg); - code = syncNodeOnClientRequest(pSyncNode, pSyncMsg, NULL); - syncClientRequestDestroy(pSyncMsg); - - } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { - SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg); - code = syncNodeOnRequestVote(pSyncNode, pSyncMsg); - syncRequestVoteDestroy(pSyncMsg); - - } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) { - SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg); - code = syncNodeOnRequestVoteReply(pSyncNode, pSyncMsg); - syncRequestVoteReplyDestroy(pSyncMsg); - - } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) { - SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg); - code = syncNodeOnAppendEntries(pSyncNode, pSyncMsg); - syncAppendEntriesDestroy(pSyncMsg); - - } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) { - SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg); - code = syncNodeOnAppendEntriesReply(pSyncNode, pSyncMsg); - syncAppendEntriesReplyDestroy(pSyncMsg); - - } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) { - SyncSnapshotSend *pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg); - code = syncNodeOnSnapshot(pSyncNode, pSyncMsg); - syncSnapshotSendDestroy(pSyncMsg); - - } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) { - SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg); - code = syncNodeOnSnapshotReply(pSyncNode, pSyncMsg); - syncSnapshotRspDestroy(pSyncMsg); - - } else if (pMsg->msgType == TDMT_SYNC_LOCAL_CMD) { - SyncLocalCmd *pSyncMsg = syncLocalCmdFromRpcMsg2(pMsg); - code = syncNodeOnLocalCmd(pSyncNode, pSyncMsg); - syncLocalCmdDestroy(pSyncMsg); - - } else { - mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType)); - code = -1; - } - syncNodeRelease(pSyncNode); + const STraceId *trace = &pMsg->info.traceId; + mGTrace("vgId:1, sync msg:%p will be processed, type:%s", pMsg, TMSG_INFO(pMsg->msgType)); + int32_t code = syncProcessMsg(pMgmt->sync, pMsg); if (code != 0) { - terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + mGError("vgId:1, failed to process sync msg:%p type:%s since %s", pMsg, TMSG_INFO(pMsg->msgType), terrstr()); } + return code; } diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 6200c88d75..97681ae67c 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -83,7 +83,6 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp); int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); -int32_t vnodeProcessSyncCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 339be16c91..7f2214d298 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -230,142 +230,16 @@ void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { } } -int32_t vnodeProcessSyncCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { - int32_t code = 0; - const STraceId *trace = &pMsg->info.traceId; - - if (!syncIsInit()) { - vGError("vgId:%d, msg:%p failed to process since sync env not start", pVnode->config.vgId, pMsg); - terrno = TSDB_CODE_APP_ERROR; - return -1; - } - - SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync); - if (pSyncNode == NULL) { - vGError("vgId:%d, msg:%p failed to process since invalid sync node", pVnode->config.vgId, pMsg); - terrno = TSDB_CODE_SYN_INTERNAL_ERROR; - return -1; - } - - vGTrace("vgId:%d, sync msg:%p will be processed, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType)); - - if (pMsg->msgType == TDMT_SYNC_HEARTBEAT) { - SyncHeartbeat *pSyncMsg = syncHeartbeatFromRpcMsg2(pMsg); - code = syncNodeOnHeartbeat(pSyncNode, pSyncMsg); - syncHeartbeatDestroy(pSyncMsg); - - } else if (pMsg->msgType == TDMT_SYNC_HEARTBEAT_REPLY) { - SyncHeartbeatReply *pSyncMsg = syncHeartbeatReplyFromRpcMsg2(pMsg); - code = syncNodeOnHeartbeatReply(pSyncNode, pSyncMsg); - syncHeartbeatReplyDestroy(pSyncMsg); - - } else { - vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg, pMsg->msgType); - code = -1; - } - - vTrace("vgId:%d, sync msg:%p is processed, type:%s code:0x%x", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType), - code); - syncNodeRelease(pSyncNode); - if (code != 0 && terrno == 0) { - terrno = TSDB_CODE_SYN_INTERNAL_ERROR; - } - return code; -} - int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { - int32_t code = 0; const STraceId *trace = &pMsg->info.traceId; - - if (!syncIsInit()) { - vGError("vgId:%d, msg:%p failed to process since sync env not start", pVnode->config.vgId, pMsg); - terrno = TSDB_CODE_APP_ERROR; - return -1; - } - - SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync); - if (pSyncNode == NULL) { - vGError("vgId:%d, msg:%p failed to process since invalid sync node", pVnode->config.vgId, pMsg); - terrno = TSDB_CODE_SYN_INTERNAL_ERROR; - return -1; - } - vGTrace("vgId:%d, sync msg:%p will be processed, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType)); - if (pMsg->msgType == TDMT_SYNC_TIMEOUT) { - SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg); - ASSERT(pSyncMsg != NULL); - code = syncNodeOnTimer(pSyncNode, pSyncMsg); - syncTimeoutDestroy(pSyncMsg); - - } else if (pMsg->msgType == TDMT_SYNC_PING) { - SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg); - ASSERT(pSyncMsg != NULL); - code = syncNodeOnPing(pSyncNode, pSyncMsg); - syncPingDestroy(pSyncMsg); - - } else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) { - SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg); - ASSERT(pSyncMsg != NULL); - code = syncNodeOnPingReply(pSyncNode, pSyncMsg); - syncPingReplyDestroy(pSyncMsg); - - } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { - SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg); - ASSERT(pSyncMsg != NULL); - code = syncNodeOnClientRequest(pSyncNode, pSyncMsg, NULL); - syncClientRequestDestroy(pSyncMsg); - - } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { - SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg); - ASSERT(pSyncMsg != NULL); - code = syncNodeOnRequestVote(pSyncNode, pSyncMsg); - syncRequestVoteDestroy(pSyncMsg); - - } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) { - SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg); - ASSERT(pSyncMsg != NULL); - code = syncNodeOnRequestVoteReply(pSyncNode, pSyncMsg); - syncRequestVoteReplyDestroy(pSyncMsg); - - } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) { - SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg); - ASSERT(pSyncMsg != NULL); - code = syncNodeOnAppendEntries(pSyncNode, pSyncMsg); - syncAppendEntriesDestroy(pSyncMsg); - - } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) { - SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg); - ASSERT(pSyncMsg != NULL); - code = syncNodeOnAppendEntriesReply(pSyncNode, pSyncMsg); - syncAppendEntriesReplyDestroy(pSyncMsg); - - } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) { - SyncSnapshotSend *pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg); - code = syncNodeOnSnapshot(pSyncNode, pSyncMsg); - syncSnapshotSendDestroy(pSyncMsg); - - } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) { - SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg); - code = syncNodeOnSnapshotReply(pSyncNode, pSyncMsg); - syncSnapshotRspDestroy(pSyncMsg); - - } else if (pMsg->msgType == TDMT_SYNC_LOCAL_CMD) { - SyncLocalCmd *pSyncMsg = syncLocalCmdFromRpcMsg2(pMsg); - code = syncNodeOnLocalCmd(pSyncNode, pSyncMsg); - syncLocalCmdDestroy(pSyncMsg); - - } else { - vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg, pMsg->msgType); - code = -1; + int32_t code = syncProcessMsg(pVnode->sync, pMsg); + if (code != 0) { + vGError("vgId:%d, failed to process sync msg:%p type:%s since %s", pVnode->config.vgId, pMsg, + TMSG_INFO(pMsg->msgType), terrstr()); } - vTrace("vgId:%d, sync msg:%p is processed, type:%s code:0x%x", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType), - code); - syncNodeRelease(pSyncNode); - if (code != 0 && terrno == 0) { - terrno = TSDB_CODE_SYN_INTERNAL_ERROR; - } return code; } diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 843dee1342..5e8041e54a 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -302,7 +302,6 @@ int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta); int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta); bool syncNodeCanChange(SSyncNode* pSyncNode); -bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pNewCfg); int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode); int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader); diff --git a/source/libs/sync/src/syncEnv.c b/source/libs/sync/src/syncEnv.c index c55cd4fdac..cf0d4d25e8 100644 --- a/source/libs/sync/src/syncEnv.c +++ b/source/libs/sync/src/syncEnv.c @@ -80,6 +80,7 @@ SSyncNode *syncNodeAcquire(int64_t rid) { SSyncNode *pNode = taosAcquireRef(gNodeRefId, rid); if (pNode == NULL) { sTrace("failed to acquire node from refId:%" PRId64, rid); + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; } return pNode; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 89499a7c7d..69c8feb256 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#define _DEFAULT_SOURCE #include "sync.h" #include "syncAppendEntries.h" #include "syncAppendEntriesReply.h" @@ -34,8 +35,6 @@ #include "syncUtil.h" #include "syncVoteMgr.h" -// ------ local funciton --------- -// enqueue message ---- static void syncNodeEqPingTimer(void* param, void* tmrId); static void syncNodeEqElectTimer(void* param, void* tmrId); static void syncNodeEqHeartbeatTimer(void* param, void* tmrId); @@ -44,158 +43,145 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths); static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId); static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg); -// process message ---- -int32_t syncNodeOnPing(SSyncNode* ths, SyncPing* pMsg); -int32_t syncNodeOnPingReply(SSyncNode* ths, SyncPingReply* pMsg); - int64_t syncOpen(SSyncInfo* pSyncInfo) { - SSyncNode* pNode = syncNodeOpen(pSyncInfo); - if (pNode == NULL) { + SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo); + if (pSyncNode == NULL) { sError("vgId:%d, failed to open sync node", pSyncInfo->vgId); return -1; } - pNode->rid = syncNodeAdd(pNode); - if (pNode->rid < 0) { - syncNodeClose(pNode); + pSyncNode->rid = syncNodeAdd(pSyncNode); + if (pSyncNode->rid < 0) { + syncNodeClose(pSyncNode); return -1; } - return pNode->rid; + return pSyncNode->rid; } void syncStart(int64_t rid) { - SSyncNode* pNode = syncNodeAcquire(rid); - if (pNode != NULL) { - syncNodeStart(pNode); - syncNodeRelease(pNode); + SSyncNode* pSyncNode = syncNodeAcquire(rid); + if (pSyncNode != NULL) { + syncNodeStart(pSyncNode); + syncNodeRelease(pSyncNode); } } void syncStop(int64_t rid) { - SSyncNode* pNode = syncNodeAcquire(rid); - if (pNode != NULL) { - syncNodeRelease(pNode); - syncNodeRemove(rid); - } -} - -bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pNewCfg) { - bool IamInNew = syncNodeInConfig(pSyncNode, pNewCfg); - if (!IamInNew) { - return false; - } - - if (pNewCfg->replicaNum > pSyncNode->replicaNum + 1) { - return false; - } - - if (pNewCfg->replicaNum < pSyncNode->replicaNum - 1) { - return false; - } - - return true; -} - -int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg) { SSyncNode* pSyncNode = syncNodeAcquire(rid); - if (pSyncNode == NULL) { - terrno = TSDB_CODE_SYN_INTERNAL_ERROR; - return -1; - } - ASSERT(rid == pSyncNode->rid); - int32_t ret = 0; - - if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) { + if (pSyncNode != NULL) { syncNodeRelease(pSyncNode); - terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR; - sError("invalid new config. vgId:%d", pSyncNode->vgId); - return -1; + syncNodeRemove(rid); } +} - char* newconfig = syncCfg2Str((SSyncCfg*)pNewCfg); - pRpcMsg->msgType = TDMT_SYNC_CONFIG_CHANGE; - pRpcMsg->info.noResp = 1; - pRpcMsg->contLen = strlen(newconfig) + 1; - pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); - snprintf(pRpcMsg->pCont, pRpcMsg->contLen, "%s", newconfig); - taosMemoryFree(newconfig); - - syncNodeRelease(pSyncNode); - return ret; +static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) { + if (!syncNodeInConfig(pSyncNode, pCfg)) return false; + return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1; } int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) { SSyncNode* pSyncNode = syncNodeAcquire(rid); - if (pSyncNode == NULL) { - terrno = TSDB_CODE_SYN_INTERNAL_ERROR; - return -1; - } - ASSERT(rid == pSyncNode->rid); + if (pSyncNode == NULL) return -1; if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) { syncNodeRelease(pSyncNode); terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR; - sError("invalid new config. vgId:%d", pSyncNode->vgId); + sError("vgId:%d, failed to reconfig since invalid new config", pSyncNode->vgId); return -1; } -#if 0 - char* newconfig = syncCfg2Str((SSyncCfg*)pNewCfg); - int32_t ret = 0; - - SRpcMsg rpcMsg = {0}; - rpcMsg.msgType = TDMT_SYNC_CONFIG_CHANGE; - rpcMsg.info.noResp = 1; - rpcMsg.contLen = strlen(newconfig) + 1; - rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); - snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", newconfig); - taosMemoryFree(newconfig); - ret = syncNodePropose(pSyncNode, &rpcMsg, false); - - syncNodeRelease(pSyncNode); - return ret; -#else syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg); syncNodeDoConfigChange(pSyncNode, pNewCfg, SYNC_INDEX_INVALID); + if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { syncNodeStopHeartbeatTimer(pSyncNode); for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) { - syncHbTimerInit(pSyncNode, &(pSyncNode->peerHeartbeatTimerArr[i]), (pSyncNode->replicasId)[i]); + syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]); } syncNodeStartHeartbeatTimer(pSyncNode); - syncNodeReplicate(pSyncNode); } + syncNodeRelease(pSyncNode); return 0; -#endif } -int32_t syncLeaderTransfer(int64_t rid) { +int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) { + int32_t code = -1; + if (!syncIsInit()) return code; + SSyncNode* pSyncNode = syncNodeAcquire(rid); - if (pSyncNode == NULL) { - terrno = TSDB_CODE_SYN_INTERNAL_ERROR; - return -1; + if (pSyncNode == NULL) return code; + + if (pMsg->msgType == TDMT_SYNC_HEARTBEAT) { + SyncHeartbeat* pSyncMsg = syncHeartbeatFromRpcMsg2(pMsg); + code = syncNodeOnHeartbeat(pSyncNode, pSyncMsg); + syncHeartbeatDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_HEARTBEAT_REPLY) { + SyncHeartbeatReply* pSyncMsg = syncHeartbeatReplyFromRpcMsg2(pMsg); + code = syncNodeOnHeartbeatReply(pSyncNode, pSyncMsg); + syncHeartbeatReplyDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_TIMEOUT) { + SyncTimeout* pSyncMsg = syncTimeoutFromRpcMsg2(pMsg); + code = syncNodeOnTimer(pSyncNode, pSyncMsg); + syncTimeoutDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_PING) { + SyncPing* pSyncMsg = syncPingFromRpcMsg2(pMsg); + code = syncNodeOnPing(pSyncNode, pSyncMsg); + syncPingDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) { + SyncPingReply* pSyncMsg = syncPingReplyFromRpcMsg2(pMsg); + code = syncNodeOnPingReply(pSyncNode, pSyncMsg); + syncPingReplyDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { + SyncClientRequest* pSyncMsg = syncClientRequestFromRpcMsg2(pMsg); + code = syncNodeOnClientRequest(pSyncNode, pSyncMsg, NULL); + syncClientRequestDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { + SyncRequestVote* pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg); + code = syncNodeOnRequestVote(pSyncNode, pSyncMsg); + syncRequestVoteDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) { + SyncRequestVoteReply* pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg); + code = syncNodeOnRequestVoteReply(pSyncNode, pSyncMsg); + syncRequestVoteReplyDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) { + SyncAppendEntries* pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg); + code = syncNodeOnAppendEntries(pSyncNode, pSyncMsg); + syncAppendEntriesDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) { + SyncAppendEntriesReply* pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg); + code = syncNodeOnAppendEntriesReply(pSyncNode, pSyncMsg); + syncAppendEntriesReplyDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) { + SyncSnapshotSend* pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg); + code = syncNodeOnSnapshot(pSyncNode, pSyncMsg); + syncSnapshotSendDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) { + SyncSnapshotRsp* pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg); + code = syncNodeOnSnapshotReply(pSyncNode, pSyncMsg); + syncSnapshotRspDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_LOCAL_CMD) { + SyncLocalCmd* pSyncMsg = syncLocalCmdFromRpcMsg2(pMsg); + code = syncNodeOnLocalCmd(pSyncNode, pSyncMsg); + syncLocalCmdDestroy(pSyncMsg); + } else { + sError("vgId:%d, failed to process msg:%p since invalid type:%s", pSyncNode->vgId, pMsg, TMSG_INFO(pMsg->msgType)); + code = -1; } - ASSERT(rid == pSyncNode->rid); - int32_t ret = syncNodeLeaderTransfer(pSyncNode); syncNodeRelease(pSyncNode); - return ret; + return code; } -int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) { +int32_t syncLeaderTransfer(int64_t rid) { SSyncNode* pSyncNode = syncNodeAcquire(rid); - if (pSyncNode == NULL) { - terrno = TSDB_CODE_SYN_INTERNAL_ERROR; - return -1; - } - ASSERT(rid == pSyncNode->rid); + if (pSyncNode == NULL) return -1; - int32_t ret = syncNodeLeaderTransferTo(pSyncNode, newLeader); + int32_t ret = syncNodeLeaderTransfer(pSyncNode); syncNodeRelease(pSyncNode); return ret; } @@ -3675,4 +3661,4 @@ void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const c snprintf(logBuf, sizeof(logBuf), "recv sync-local-cmd {cmd:%d-%s, sd-new-term:%" PRIu64 "}, %s", pMsg->cmd, syncLocalCmdGetStr(pMsg->cmd), pMsg->sdNewTerm, s); syncNodeEventLog(pSyncNode, logBuf); -} \ No newline at end of file +} -- GitLab