diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 273bd00922b07f792dd33d9fa6ffaf1d04c513b1..5e1cec55a9cf47efbe9c8b717403fe7ab1c02122 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -541,6 +541,16 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { code = syncNodeOnAppendEntriesReply(pSyncNode, pSyncMsg); syncAppendEntriesReplyDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) { + SyncSnapshotSend *pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg); + code = syncNodeOnSnapshot(pSyncNode, pSyncMsg); + syncSnapshotSendDestroy(pSyncMsg); + + } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) { + SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg); + code = syncNodeOnSnapshotReply(pSyncNode, pSyncMsg); + syncSnapshotRspDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_SYNC_SET_MNODE_STANDBY) { code = syncSetStandby(pMgmt->sync); SRpcMsg rsp = {.code = code, .info = pMsg->info}; diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 061cd9cd3a8c3c5059caddcad4c2af637884a805..559cd050ccfee6e73f7b5ae920d45428e34f6580 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -385,19 +385,18 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { vGTrace("vgId:%d, sync msg:%p will be processed, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType)); - if (pMsg->msgType == TDMT_SYNC_TIMEOUT) { SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg); ASSERT(pSyncMsg != NULL); code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg); syncTimeoutDestroy(pSyncMsg); - + } else if (pMsg->msgType == TDMT_SYNC_PING) { SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg); ASSERT(pSyncMsg != NULL); code = syncNodeOnPingCb(pSyncNode, pSyncMsg); syncPingDestroy(pSyncMsg); - + } else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) { SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg); ASSERT(pSyncMsg != NULL); @@ -410,12 +409,6 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { code = syncNodeOnClientRequest(pSyncNode, pSyncMsg, NULL); syncClientRequestDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST_BATCH) { - SyncClientRequestBatch *pSyncMsg = syncClientRequestBatchFromRpcMsg(pMsg); - ASSERT(pSyncMsg != NULL); - code = syncNodeOnClientRequestBatchCb(pSyncNode, pSyncMsg); - syncClientRequestBatchDestroyDeep(pSyncMsg); - } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg); ASSERT(pSyncMsg != NULL); @@ -439,16 +432,16 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ASSERT(pSyncMsg != NULL); code = syncNodeOnAppendEntriesReply(pSyncNode, pSyncMsg); syncAppendEntriesReplyDestroy(pSyncMsg); - - } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) { - SyncSnapshotSend *pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg); - code = syncNodeOnSnapshot(pSyncNode, pSyncMsg); - syncSnapshotSendDestroy(pSyncMsg); + + } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) { + SyncSnapshotSend *pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg); + code = syncNodeOnSnapshot(pSyncNode, pSyncMsg); + syncSnapshotSendDestroy(pSyncMsg); } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) { - SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg); - code = syncNodeOnSnapshotReply(pSyncNode, pSyncMsg); - syncSnapshotRspDestroy(pSyncMsg); + SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg); + code = syncNodeOnSnapshotReply(pSyncNode, pSyncMsg); + syncSnapshotRspDestroy(pSyncMsg); } else if (pMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) { code = vnodeSetStandBy(pVnode); @@ -457,11 +450,10 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { tmsgSendRsp(&rsp); } else { - vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg, pMsg->msgType); - code = -1; + vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg, pMsg->msgType); + code = -1; } - #if 0 if (syncNodeStrategy(pSyncNode) == SYNC_STRATEGY_NO_SNAPSHOT) { if (pMsg->msgType == TDMT_SYNC_TIMEOUT) { @@ -935,4 +927,3 @@ bool vnodeIsLeader(SVnode *pVnode) { return true; } - diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index 8d21d5fabdfdffb32bf1c021c0fd4e965b76f262..57e259e4bcaf1e7d640ec1b1490cf6ae089966f2 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -40,14 +40,14 @@ typedef struct SSyncSnapshotSender { bool start; int32_t seq; int32_t ack; - void *pReader; - void *pCurrentBlock; + void * pReader; + void * pCurrentBlock; int32_t blockLen; SSnapshotParam snapshotParam; SSnapshot snapshot; SSyncCfg lastConfig; int64_t sendingMS; - SSyncNode *pSyncNode; + SSyncNode * pSyncNode; int32_t replicaIndex; SyncTerm term; SyncTerm privateTerm; @@ -65,8 +65,8 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender); int32_t snapshotReSend(SSyncSnapshotSender *pSender); cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender); -char *snapshotSender2Str(SSyncSnapshotSender *pSender); -char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event); +char * snapshotSender2Str(SSyncSnapshotSender *pSender); +char * snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event); int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId); @@ -74,13 +74,13 @@ int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId); typedef struct SSyncSnapshotReceiver { bool start; int32_t ack; - void *pWriter; + void * pWriter; SyncTerm term; SyncTerm privateTerm; SSnapshotParam snapshotParam; SSnapshot snapshot; SRaftId fromId; - SSyncNode *pSyncNode; + SSyncNode * pSyncNode; } SSyncSnapshotReceiver; @@ -92,8 +92,8 @@ bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver); cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver); -char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); -char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event); +char * snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); +char * snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event); //--------------------------------------------------- // on message diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index c461f4cff1ca24c57eb56eeb91c42271405a4fd3..c83e0082c363ee0ce2caaaa8f73fbabd91ee1336 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -379,14 +379,14 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) { char *snapshotSender2Str(SSyncSnapshotSender *pSender) { cJSON *pJson = snapshotSender2Json(pSender); - char *serialized = cJSON_Print(pJson); + char * serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) { int32_t len = 256; - char *s = taosMemoryMalloc(len); + char * s = taosMemoryMalloc(len); SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; char host[64]; @@ -674,7 +674,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { cJSON_AddStringToObject(pFromId, "addr", u64buf); { uint64_t u64 = pReceiver->fromId.addr; - cJSON *pTmp = pFromId; + cJSON * pTmp = pFromId; char host[128] = {0}; uint16_t port; syncUtilU642Addr(u64, host, sizeof(host), &port); @@ -707,14 +707,14 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) { cJSON *pJson = snapshotReceiver2Json(pReceiver); - char *serialized = cJSON_Print(pJson); + char * serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) { int32_t len = 256; - char *s = taosMemoryMalloc(len); + char * s = taosMemoryMalloc(len); SRaftId fromId = pReceiver->fromId; char host[128];