diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index 89ed509161e656b703058dd80c890020bed098d7..a55f0f7bdd66e1d9fcb066bd92e8c8655ae251dc 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -191,25 +191,6 @@ typedef struct SyncSnapshotSend { char data[]; } SyncSnapshotSend; -SyncSnapshotSend* syncSnapshotSendBuild(uint32_t dataLen, int32_t vgId); -void syncSnapshotSendDestroy(SyncSnapshotSend* pMsg); -void syncSnapshotSendSerialize(const SyncSnapshotSend* pMsg, char* buf, uint32_t bufLen); -void syncSnapshotSendDeserialize(const char* buf, uint32_t len, SyncSnapshotSend* pMsg); -char* syncSnapshotSendSerialize2(const SyncSnapshotSend* pMsg, uint32_t* len); -SyncSnapshotSend* syncSnapshotSendDeserialize2(const char* buf, uint32_t len); -void syncSnapshotSend2RpcMsg(const SyncSnapshotSend* pMsg, SRpcMsg* pRpcMsg); -void syncSnapshotSendFromRpcMsg(const SRpcMsg* pRpcMsg, SyncSnapshotSend* pMsg); -SyncSnapshotSend* syncSnapshotSendFromRpcMsg2(const SRpcMsg* pRpcMsg); -cJSON* syncSnapshotSend2Json(const SyncSnapshotSend* pMsg); -char* syncSnapshotSend2Str(const SyncSnapshotSend* pMsg); - -// for debug ---------------------- -void syncSnapshotSendPrint(const SyncSnapshotSend* pMsg); -void syncSnapshotSendPrint2(char* s, const SyncSnapshotSend* pMsg); -void syncSnapshotSendLog(const SyncSnapshotSend* pMsg); -void syncSnapshotSendLog2(char* s, const SyncSnapshotSend* pMsg); - -// --------------------------------------------- typedef struct SyncSnapshotRsp { uint32_t bytes; int32_t vgId; @@ -312,8 +293,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* pNode, const SRpcMsg* pMsg); int32_t syncNodeOnRequestVoteReply(SSyncNode* pNode, const SRpcMsg* pMsg); int32_t syncNodeOnAppendEntries(SSyncNode* pNode, const SRpcMsg* pMsg); int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pMsg); - -int32_t syncNodeOnSnapshot(SSyncNode* ths, SyncSnapshotSend* pMsg); +int32_t syncNodeOnSnapshot(SSyncNode* ths, const SRpcMsg* pMsg); int32_t syncNodeOnSnapshotReply(SSyncNode* ths, SyncSnapshotRsp* pMsg); int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pMsg); @@ -342,6 +322,7 @@ int32_t syncBuildHeartbeatReply(SRpcMsg* pMsg, int32_t vgId); int32_t syncBuildPreSnapshot(SRpcMsg* pMsg, int32_t vgId); int32_t syncBuildPreSnapshotReply(SRpcMsg* pMsg, int32_t vgId); int32_t syncBuildApplyMsg(SRpcMsg* pMsg, const SRpcMsg* pOriginal, int32_t vgId, SFsmCbMeta* pMeta); +int32_t syncBuildSnapshotSend(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index 93b2531235a0eda18100c2ade6fd7387887e99ca..9b2da52c3ab7a07872c870b5d56789d8438bd591 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -84,7 +84,7 @@ bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver); // on message -int32_t syncNodeOnSnapshot(SSyncNode *ths, SyncSnapshotSend *pMsg); +int32_t syncNodeOnSnapshot(SSyncNode *ths, const SRpcMsg *pMsg); int32_t syncNodeOnSnapshotReply(SSyncNode *ths, SyncSnapshotRsp *pMsg); // start diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 0952f87ed7a2aa56905a2fbfb0f87b4b285e2d18..99473a5e4995c1aa281da2885eb32adeec6c559b 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -150,9 +150,7 @@ int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) { } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) { code = syncNodeOnAppendEntriesReply(pSyncNode, pMsg); } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) { - SyncSnapshotSend* pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg); - code = syncNodeOnSnapshot(pSyncNode, pSyncMsg); - syncSnapshotSendDestroy(pSyncMsg); + code = syncNodeOnSnapshot(pSyncNode, pMsg); } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) { SyncSnapshotRsp* pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg); code = syncNodeOnSnapshotReply(pSyncNode, pSyncMsg); diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index bee20d7c3d45fc1af1ceac72c19352aca4e970a9..9c319f2a48bff9420684652691c8e0b6ee3b6100 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -223,70 +223,22 @@ int32_t syncBuildPreSnapshotReply(SRpcMsg* pMsg, int32_t vgId) { return 0; } -// --------------------------------------------- -SyncSnapshotSend* syncSnapshotSendBuild(uint32_t dataLen, int32_t vgId) { - uint32_t bytes = sizeof(SyncSnapshotSend) + dataLen; - SyncSnapshotSend* pMsg = taosMemoryMalloc(bytes); - memset(pMsg, 0, bytes); - pMsg->bytes = bytes; - pMsg->vgId = vgId; +int32_t syncBuildSnapshotSend(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId) { + int32_t bytes = sizeof(SyncSnapshotSend) + dataLen; + pMsg->pCont = rpcMallocCont(bytes); pMsg->msgType = TDMT_SYNC_SNAPSHOT_SEND; - pMsg->dataLen = dataLen; - return pMsg; -} - -void syncSnapshotSendDestroy(SyncSnapshotSend* pMsg) { - if (pMsg != NULL) { - taosMemoryFree(pMsg); - } -} - -void syncSnapshotSendSerialize(const SyncSnapshotSend* pMsg, char* buf, uint32_t bufLen) { - ASSERT(pMsg->bytes <= bufLen); - memcpy(buf, pMsg, pMsg->bytes); -} - -void syncSnapshotSendDeserialize(const char* buf, uint32_t len, SyncSnapshotSend* pMsg) { - memcpy(pMsg, buf, len); - ASSERT(len == pMsg->bytes); - ASSERT(pMsg->bytes == sizeof(SyncSnapshotSend) + pMsg->dataLen); -} - -char* syncSnapshotSendSerialize2(const SyncSnapshotSend* pMsg, uint32_t* len) { - char* buf = taosMemoryMalloc(pMsg->bytes); - ASSERT(buf != NULL); - syncSnapshotSendSerialize(pMsg, buf, pMsg->bytes); - if (len != NULL) { - *len = pMsg->bytes; + pMsg->contLen = bytes; + if (pMsg->pCont == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; } - return buf; -} - -SyncSnapshotSend* syncSnapshotSendDeserialize2(const char* buf, uint32_t len) { - uint32_t bytes = *((uint32_t*)buf); - SyncSnapshotSend* pMsg = taosMemoryMalloc(bytes); - ASSERT(pMsg != NULL); - syncSnapshotSendDeserialize(buf, len, pMsg); - ASSERT(len == pMsg->bytes); - return pMsg; -} -void syncSnapshotSend2RpcMsg(const SyncSnapshotSend* pMsg, SRpcMsg* pRpcMsg) { - memset(pRpcMsg, 0, sizeof(*pRpcMsg)); - pRpcMsg->msgType = pMsg->msgType; - pRpcMsg->contLen = pMsg->bytes; - pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); - syncSnapshotSendSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); -} - -void syncSnapshotSendFromRpcMsg(const SRpcMsg* pRpcMsg, SyncSnapshotSend* pMsg) { - syncSnapshotSendDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); -} - -SyncSnapshotSend* syncSnapshotSendFromRpcMsg2(const SRpcMsg* pRpcMsg) { - SyncSnapshotSend* pMsg = syncSnapshotSendDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); - ASSERT(pMsg != NULL); - return pMsg; + SyncSnapshotSend* pSnapshotSend = pMsg->pCont; + pSnapshotSend->bytes = bytes; + pSnapshotSend->vgId = vgId; + pSnapshotSend->msgType = TDMT_SYNC_SNAPSHOT_SEND; + pSnapshotSend->dataLen = dataLen; + return 0; } SyncSnapshotRsp* syncSnapshotRspBuild(int32_t vgId) { diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 256be35f052d73f8a61628337353095778c3dd66..45f4244c9f7134e8c052cc79a25197141650319e 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -105,7 +105,10 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { pSender->finish = false; // build begin msg - SyncSnapshotSend *pMsg = syncSnapshotSendBuild(0, pSender->pSyncNode->vgId); + SRpcMsg rpcMsg = {0}; + (void)syncBuildSnapshotSend(&rpcMsg, 0, pSender->pSyncNode->vgId); + + SyncSnapshotSend *pMsg = rpcMsg.pCont; pMsg->srcId = pSender->pSyncNode->myRaftId; pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex]; pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm; @@ -118,11 +121,8 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { pMsg->seq = SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT; // send msg - SRpcMsg rpcMsg; - syncSnapshotSend2RpcMsg(pMsg, &rpcMsg); - syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg); + syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg); syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, ""); - syncSnapshotSendDestroy(pMsg); // event log sSTrace(pSender, "snapshot sender start"); @@ -176,7 +176,10 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) { } // build msg - SyncSnapshotSend *pMsg = syncSnapshotSendBuild(pSender->blockLen, pSender->pSyncNode->vgId); + SRpcMsg rpcMsg = {0}; + (void)syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId); + + SyncSnapshotSend *pMsg = rpcMsg.pCont; pMsg->srcId = pSender->pSyncNode->myRaftId; pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex]; pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm; @@ -192,11 +195,8 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) { memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen); // send msg - SRpcMsg rpcMsg; - syncSnapshotSend2RpcMsg(pMsg, &rpcMsg); - syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg); + syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg); syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, ""); - syncSnapshotSendDestroy(pMsg); // event log if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) { @@ -212,7 +212,10 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) { // send current block data if (pSender->pCurrentBlock != NULL && pSender->blockLen > 0) { // build msg - SyncSnapshotSend *pMsg = syncSnapshotSendBuild(pSender->blockLen, pSender->pSyncNode->vgId); + SRpcMsg rpcMsg = {0}; + (void)syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId); + + SyncSnapshotSend *pMsg = rpcMsg.pCont; pMsg->srcId = pSender->pSyncNode->myRaftId; pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex]; pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm; @@ -224,15 +227,11 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) { pMsg->seq = pSender->seq; // pMsg->privateTerm = pSender->privateTerm; - memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen); // send msg - SRpcMsg rpcMsg; - syncSnapshotSend2RpcMsg(pMsg, &rpcMsg); - syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg); + syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg); syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, ""); - syncSnapshotSendDestroy(pMsg); // event log sSTrace(pSender, "snapshot sender resend"); @@ -705,7 +704,9 @@ static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMs // // condition 5, got data, update ack // -int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { +int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { + SyncSnapshotSend *pMsg = pRpcMsg->pCont; + // if already drop replica, do not process if (!syncNodeInRaftGroup(pSyncNode, &(pMsg->srcId))) { syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "not in my config"); @@ -797,13 +798,16 @@ int32_t syncNodeOnSnapshotReplyPre(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) } // update next index - syncIndexMgrSetIndex(pSyncNode->pNextIndex, &(pMsg->srcId), snapshot.lastApplyIndex + 1); + syncIndexMgrSetIndex(pSyncNode->pNextIndex, &pMsg->srcId, snapshot.lastApplyIndex + 1); // update seq pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN; // build begin msg - SyncSnapshotSend *pSendMsg = syncSnapshotSendBuild(0, pSender->pSyncNode->vgId); + SRpcMsg rpcMsg = {0}; + (void)syncBuildSnapshotSend(&rpcMsg, 0, pSender->pSyncNode->vgId); + + SyncSnapshotSend *pSendMsg = rpcMsg.pCont; pSendMsg->srcId = pSender->pSyncNode->myRaftId; pSendMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex]; pSendMsg->term = pSender->pSyncNode->pRaftStore->currentTerm; @@ -816,11 +820,8 @@ int32_t syncNodeOnSnapshotReplyPre(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) pSendMsg->seq = SYNC_SNAPSHOT_SEQ_BEGIN; // send msg - SRpcMsg rpcMsg; - syncSnapshotSend2RpcMsg(pSendMsg, &rpcMsg); - syncNodeSendMsgById(&(pSendMsg->destId), pSender->pSyncNode, &rpcMsg); + syncNodeSendMsgById(&pSendMsg->destId, pSender->pSyncNode, &rpcMsg); syncLogSendSyncSnapshotSend(pSyncNode, pSendMsg, ""); - syncSnapshotSendDestroy(pSendMsg); return 0; } diff --git a/source/libs/sync/test/sync_test_lib/inc/syncTest.h b/source/libs/sync/test/sync_test_lib/inc/syncTest.h index 8a899406f1e4ac7d3d22f50b2b6ad9d1fe949398..9219a1e78ecb9158200c9091f5bec42d31f6e4d6 100644 --- a/source/libs/sync/test/sync_test_lib/inc/syncTest.h +++ b/source/libs/sync/test/sync_test_lib/inc/syncTest.h @@ -409,6 +409,24 @@ void syncApplyMsgPrint2(char* s, const SyncApplyMsg* pMsg); void syncApplyMsgLog(const SyncApplyMsg* pMsg); void syncApplyMsgLog2(char* s, const SyncApplyMsg* pMsg); +SyncSnapshotSend* syncSnapshotSendBuild(uint32_t dataLen, int32_t vgId); +void syncSnapshotSendDestroy(SyncSnapshotSend* pMsg); +void syncSnapshotSendSerialize(const SyncSnapshotSend* pMsg, char* buf, uint32_t bufLen); +void syncSnapshotSendDeserialize(const char* buf, uint32_t len, SyncSnapshotSend* pMsg); +char* syncSnapshotSendSerialize2(const SyncSnapshotSend* pMsg, uint32_t* len); +SyncSnapshotSend* syncSnapshotSendDeserialize2(const char* buf, uint32_t len); +void syncSnapshotSend2RpcMsg(const SyncSnapshotSend* pMsg, SRpcMsg* pRpcMsg); +void syncSnapshotSendFromRpcMsg(const SRpcMsg* pRpcMsg, SyncSnapshotSend* pMsg); +SyncSnapshotSend* syncSnapshotSendFromRpcMsg2(const SRpcMsg* pRpcMsg); +cJSON* syncSnapshotSend2Json(const SyncSnapshotSend* pMsg); +char* syncSnapshotSend2Str(const SyncSnapshotSend* pMsg); + +// for debug ---------------------- +void syncSnapshotSendPrint(const SyncSnapshotSend* pMsg); +void syncSnapshotSendPrint2(char* s, const SyncSnapshotSend* pMsg); +void syncSnapshotSendLog(const SyncSnapshotSend* pMsg); +void syncSnapshotSendLog2(char* s, const SyncSnapshotSend* pMsg); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c b/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c index b797470d729df1ad6dabea358d73f96e77222620..00caf1d80264bae42fc35b91db4ec1aae311b2d3 100644 --- a/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c +++ b/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c @@ -2392,3 +2392,69 @@ void syncApplyMsgLog2(char* s, const SyncApplyMsg* pMsg) { taosMemoryFree(serialized); } } + +// --------------------------------------------- +SyncSnapshotSend* syncSnapshotSendBuild(uint32_t dataLen, int32_t vgId) { + uint32_t bytes = sizeof(SyncSnapshotSend) + dataLen; + SyncSnapshotSend* pMsg = taosMemoryMalloc(bytes); + memset(pMsg, 0, bytes); + pMsg->bytes = bytes; + pMsg->vgId = vgId; + pMsg->msgType = TDMT_SYNC_SNAPSHOT_SEND; + pMsg->dataLen = dataLen; + return pMsg; +} + +void syncSnapshotSendDestroy(SyncSnapshotSend* pMsg) { + if (pMsg != NULL) { + taosMemoryFree(pMsg); + } +} + +void syncSnapshotSendSerialize(const SyncSnapshotSend* pMsg, char* buf, uint32_t bufLen) { + ASSERT(pMsg->bytes <= bufLen); + memcpy(buf, pMsg, pMsg->bytes); +} + +void syncSnapshotSendDeserialize(const char* buf, uint32_t len, SyncSnapshotSend* pMsg) { + memcpy(pMsg, buf, len); + ASSERT(len == pMsg->bytes); + ASSERT(pMsg->bytes == sizeof(SyncSnapshotSend) + pMsg->dataLen); +} + +char* syncSnapshotSendSerialize2(const SyncSnapshotSend* pMsg, uint32_t* len) { + char* buf = taosMemoryMalloc(pMsg->bytes); + ASSERT(buf != NULL); + syncSnapshotSendSerialize(pMsg, buf, pMsg->bytes); + if (len != NULL) { + *len = pMsg->bytes; + } + return buf; +} + +SyncSnapshotSend* syncSnapshotSendDeserialize2(const char* buf, uint32_t len) { + uint32_t bytes = *((uint32_t*)buf); + SyncSnapshotSend* pMsg = taosMemoryMalloc(bytes); + ASSERT(pMsg != NULL); + syncSnapshotSendDeserialize(buf, len, pMsg); + ASSERT(len == pMsg->bytes); + return pMsg; +} + +void syncSnapshotSend2RpcMsg(const SyncSnapshotSend* pMsg, SRpcMsg* pRpcMsg) { + memset(pRpcMsg, 0, sizeof(*pRpcMsg)); + pRpcMsg->msgType = pMsg->msgType; + pRpcMsg->contLen = pMsg->bytes; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + syncSnapshotSendSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); +} + +void syncSnapshotSendFromRpcMsg(const SRpcMsg* pRpcMsg, SyncSnapshotSend* pMsg) { + syncSnapshotSendDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); +} + +SyncSnapshotSend* syncSnapshotSendFromRpcMsg2(const SRpcMsg* pRpcMsg) { + SyncSnapshotSend* pMsg = syncSnapshotSendDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); + ASSERT(pMsg != NULL); + return pMsg; +}