提交 eec0ab8c 编写于 作者: S Shengliang Guan

refact: adjust sync snapshotsend msg

上级 d5f7f8c3
...@@ -191,25 +191,6 @@ typedef struct SyncSnapshotSend { ...@@ -191,25 +191,6 @@ typedef struct SyncSnapshotSend {
char data[]; char data[];
} SyncSnapshotSend; } SyncSnapshotSend;
SyncSnapshotSend* syncSnapshotSendBuild(uint32_t dataLen, int32_t vgId);
void syncSnapshotSendDestroy(SyncSnapshotSend* pMsg);
void syncSnapshotSendSerialize(const SyncSnapshotSend* pMsg, char* buf, uint32_t bufLen);
void syncSnapshotSendDeserialize(const char* buf, uint32_t len, SyncSnapshotSend* pMsg);
char* syncSnapshotSendSerialize2(const SyncSnapshotSend* pMsg, uint32_t* len);
SyncSnapshotSend* syncSnapshotSendDeserialize2(const char* buf, uint32_t len);
void syncSnapshotSend2RpcMsg(const SyncSnapshotSend* pMsg, SRpcMsg* pRpcMsg);
void syncSnapshotSendFromRpcMsg(const SRpcMsg* pRpcMsg, SyncSnapshotSend* pMsg);
SyncSnapshotSend* syncSnapshotSendFromRpcMsg2(const SRpcMsg* pRpcMsg);
cJSON* syncSnapshotSend2Json(const SyncSnapshotSend* pMsg);
char* syncSnapshotSend2Str(const SyncSnapshotSend* pMsg);
// for debug ----------------------
void syncSnapshotSendPrint(const SyncSnapshotSend* pMsg);
void syncSnapshotSendPrint2(char* s, const SyncSnapshotSend* pMsg);
void syncSnapshotSendLog(const SyncSnapshotSend* pMsg);
void syncSnapshotSendLog2(char* s, const SyncSnapshotSend* pMsg);
// ---------------------------------------------
typedef struct SyncSnapshotRsp { typedef struct SyncSnapshotRsp {
uint32_t bytes; uint32_t bytes;
int32_t vgId; int32_t vgId;
...@@ -312,8 +293,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* pNode, const SRpcMsg* pMsg); ...@@ -312,8 +293,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* pNode, const SRpcMsg* pMsg);
int32_t syncNodeOnRequestVoteReply(SSyncNode* pNode, const SRpcMsg* pMsg); int32_t syncNodeOnRequestVoteReply(SSyncNode* pNode, const SRpcMsg* pMsg);
int32_t syncNodeOnAppendEntries(SSyncNode* pNode, const SRpcMsg* pMsg); int32_t syncNodeOnAppendEntries(SSyncNode* pNode, const SRpcMsg* pMsg);
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pMsg); int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnSnapshot(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnSnapshot(SSyncNode* ths, SyncSnapshotSend* pMsg);
int32_t syncNodeOnSnapshotReply(SSyncNode* ths, SyncSnapshotRsp* pMsg); int32_t syncNodeOnSnapshotReply(SSyncNode* ths, SyncSnapshotRsp* pMsg);
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pMsg); int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pMsg);
...@@ -342,6 +322,7 @@ int32_t syncBuildHeartbeatReply(SRpcMsg* pMsg, int32_t vgId); ...@@ -342,6 +322,7 @@ int32_t syncBuildHeartbeatReply(SRpcMsg* pMsg, int32_t vgId);
int32_t syncBuildPreSnapshot(SRpcMsg* pMsg, int32_t vgId); int32_t syncBuildPreSnapshot(SRpcMsg* pMsg, int32_t vgId);
int32_t syncBuildPreSnapshotReply(SRpcMsg* pMsg, int32_t vgId); int32_t syncBuildPreSnapshotReply(SRpcMsg* pMsg, int32_t vgId);
int32_t syncBuildApplyMsg(SRpcMsg* pMsg, const SRpcMsg* pOriginal, int32_t vgId, SFsmCbMeta* pMeta); int32_t syncBuildApplyMsg(SRpcMsg* pMsg, const SRpcMsg* pOriginal, int32_t vgId, SFsmCbMeta* pMeta);
int32_t syncBuildSnapshotSend(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -84,7 +84,7 @@ bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) ...@@ -84,7 +84,7 @@ bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver)
void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver); void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver);
// on message // on message
int32_t syncNodeOnSnapshot(SSyncNode *ths, SyncSnapshotSend *pMsg); int32_t syncNodeOnSnapshot(SSyncNode *ths, const SRpcMsg *pMsg);
int32_t syncNodeOnSnapshotReply(SSyncNode *ths, SyncSnapshotRsp *pMsg); int32_t syncNodeOnSnapshotReply(SSyncNode *ths, SyncSnapshotRsp *pMsg);
// start // start
......
...@@ -150,9 +150,7 @@ int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) { ...@@ -150,9 +150,7 @@ int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
} else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) { } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
code = syncNodeOnAppendEntriesReply(pSyncNode, pMsg); code = syncNodeOnAppendEntriesReply(pSyncNode, pMsg);
} else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) { } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) {
SyncSnapshotSend* pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg); code = syncNodeOnSnapshot(pSyncNode, pMsg);
code = syncNodeOnSnapshot(pSyncNode, pSyncMsg);
syncSnapshotSendDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) { } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) {
SyncSnapshotRsp* pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg); SyncSnapshotRsp* pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg);
code = syncNodeOnSnapshotReply(pSyncNode, pSyncMsg); code = syncNodeOnSnapshotReply(pSyncNode, pSyncMsg);
......
...@@ -223,70 +223,22 @@ int32_t syncBuildPreSnapshotReply(SRpcMsg* pMsg, int32_t vgId) { ...@@ -223,70 +223,22 @@ int32_t syncBuildPreSnapshotReply(SRpcMsg* pMsg, int32_t vgId) {
return 0; return 0;
} }
// --------------------------------------------- int32_t syncBuildSnapshotSend(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId) {
SyncSnapshotSend* syncSnapshotSendBuild(uint32_t dataLen, int32_t vgId) { int32_t bytes = sizeof(SyncSnapshotSend) + dataLen;
uint32_t bytes = sizeof(SyncSnapshotSend) + dataLen; pMsg->pCont = rpcMallocCont(bytes);
SyncSnapshotSend* pMsg = taosMemoryMalloc(bytes);
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
pMsg->vgId = vgId;
pMsg->msgType = TDMT_SYNC_SNAPSHOT_SEND; pMsg->msgType = TDMT_SYNC_SNAPSHOT_SEND;
pMsg->dataLen = dataLen; pMsg->contLen = bytes;
return pMsg; if (pMsg->pCont == NULL) {
} terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
void syncSnapshotSendDestroy(SyncSnapshotSend* pMsg) {
if (pMsg != NULL) {
taosMemoryFree(pMsg);
}
}
void syncSnapshotSendSerialize(const SyncSnapshotSend* pMsg, char* buf, uint32_t bufLen) {
ASSERT(pMsg->bytes <= bufLen);
memcpy(buf, pMsg, pMsg->bytes);
}
void syncSnapshotSendDeserialize(const char* buf, uint32_t len, SyncSnapshotSend* pMsg) {
memcpy(pMsg, buf, len);
ASSERT(len == pMsg->bytes);
ASSERT(pMsg->bytes == sizeof(SyncSnapshotSend) + pMsg->dataLen);
}
char* syncSnapshotSendSerialize2(const SyncSnapshotSend* pMsg, uint32_t* len) {
char* buf = taosMemoryMalloc(pMsg->bytes);
ASSERT(buf != NULL);
syncSnapshotSendSerialize(pMsg, buf, pMsg->bytes);
if (len != NULL) {
*len = pMsg->bytes;
} }
return buf;
}
SyncSnapshotSend* syncSnapshotSendDeserialize2(const char* buf, uint32_t len) {
uint32_t bytes = *((uint32_t*)buf);
SyncSnapshotSend* pMsg = taosMemoryMalloc(bytes);
ASSERT(pMsg != NULL);
syncSnapshotSendDeserialize(buf, len, pMsg);
ASSERT(len == pMsg->bytes);
return pMsg;
}
void syncSnapshotSend2RpcMsg(const SyncSnapshotSend* pMsg, SRpcMsg* pRpcMsg) { SyncSnapshotSend* pSnapshotSend = pMsg->pCont;
memset(pRpcMsg, 0, sizeof(*pRpcMsg)); pSnapshotSend->bytes = bytes;
pRpcMsg->msgType = pMsg->msgType; pSnapshotSend->vgId = vgId;
pRpcMsg->contLen = pMsg->bytes; pSnapshotSend->msgType = TDMT_SYNC_SNAPSHOT_SEND;
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); pSnapshotSend->dataLen = dataLen;
syncSnapshotSendSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); return 0;
}
void syncSnapshotSendFromRpcMsg(const SRpcMsg* pRpcMsg, SyncSnapshotSend* pMsg) {
syncSnapshotSendDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
}
SyncSnapshotSend* syncSnapshotSendFromRpcMsg2(const SRpcMsg* pRpcMsg) {
SyncSnapshotSend* pMsg = syncSnapshotSendDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
ASSERT(pMsg != NULL);
return pMsg;
} }
SyncSnapshotRsp* syncSnapshotRspBuild(int32_t vgId) { SyncSnapshotRsp* syncSnapshotRspBuild(int32_t vgId) {
......
...@@ -105,7 +105,10 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { ...@@ -105,7 +105,10 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
pSender->finish = false; pSender->finish = false;
// build begin msg // build begin msg
SyncSnapshotSend *pMsg = syncSnapshotSendBuild(0, pSender->pSyncNode->vgId); SRpcMsg rpcMsg = {0};
(void)syncBuildSnapshotSend(&rpcMsg, 0, pSender->pSyncNode->vgId);
SyncSnapshotSend *pMsg = rpcMsg.pCont;
pMsg->srcId = pSender->pSyncNode->myRaftId; pMsg->srcId = pSender->pSyncNode->myRaftId;
pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex]; pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm; pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
...@@ -118,11 +121,8 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { ...@@ -118,11 +121,8 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
pMsg->seq = SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT; pMsg->seq = SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT;
// send msg // send msg
SRpcMsg rpcMsg; syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg);
syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, ""); syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "");
syncSnapshotSendDestroy(pMsg);
// event log // event log
sSTrace(pSender, "snapshot sender start"); sSTrace(pSender, "snapshot sender start");
...@@ -176,7 +176,10 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) { ...@@ -176,7 +176,10 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
} }
// build msg // build msg
SyncSnapshotSend *pMsg = syncSnapshotSendBuild(pSender->blockLen, pSender->pSyncNode->vgId); SRpcMsg rpcMsg = {0};
(void)syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId);
SyncSnapshotSend *pMsg = rpcMsg.pCont;
pMsg->srcId = pSender->pSyncNode->myRaftId; pMsg->srcId = pSender->pSyncNode->myRaftId;
pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex]; pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm; pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
...@@ -192,11 +195,8 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) { ...@@ -192,11 +195,8 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen); memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);
// send msg // send msg
SRpcMsg rpcMsg; syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg);
syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, ""); syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "");
syncSnapshotSendDestroy(pMsg);
// event log // event log
if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) { if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) {
...@@ -212,7 +212,10 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) { ...@@ -212,7 +212,10 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
// send current block data // send current block data
if (pSender->pCurrentBlock != NULL && pSender->blockLen > 0) { if (pSender->pCurrentBlock != NULL && pSender->blockLen > 0) {
// build msg // build msg
SyncSnapshotSend *pMsg = syncSnapshotSendBuild(pSender->blockLen, pSender->pSyncNode->vgId); SRpcMsg rpcMsg = {0};
(void)syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId);
SyncSnapshotSend *pMsg = rpcMsg.pCont;
pMsg->srcId = pSender->pSyncNode->myRaftId; pMsg->srcId = pSender->pSyncNode->myRaftId;
pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex]; pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm; pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
...@@ -224,15 +227,11 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) { ...@@ -224,15 +227,11 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
pMsg->seq = pSender->seq; pMsg->seq = pSender->seq;
// pMsg->privateTerm = pSender->privateTerm; // pMsg->privateTerm = pSender->privateTerm;
memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen); memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);
// send msg // send msg
SRpcMsg rpcMsg; syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg);
syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, ""); syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "");
syncSnapshotSendDestroy(pMsg);
// event log // event log
sSTrace(pSender, "snapshot sender resend"); sSTrace(pSender, "snapshot sender resend");
...@@ -705,7 +704,9 @@ static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMs ...@@ -705,7 +704,9 @@ static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMs
// //
// condition 5, got data, update ack // condition 5, got data, update ack
// //
int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
SyncSnapshotSend *pMsg = pRpcMsg->pCont;
// if already drop replica, do not process // if already drop replica, do not process
if (!syncNodeInRaftGroup(pSyncNode, &(pMsg->srcId))) { if (!syncNodeInRaftGroup(pSyncNode, &(pMsg->srcId))) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "not in my config"); syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "not in my config");
...@@ -797,13 +798,16 @@ int32_t syncNodeOnSnapshotReplyPre(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) ...@@ -797,13 +798,16 @@ int32_t syncNodeOnSnapshotReplyPre(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg)
} }
// update next index // update next index
syncIndexMgrSetIndex(pSyncNode->pNextIndex, &(pMsg->srcId), snapshot.lastApplyIndex + 1); syncIndexMgrSetIndex(pSyncNode->pNextIndex, &pMsg->srcId, snapshot.lastApplyIndex + 1);
// update seq // update seq
pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN; pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN;
// build begin msg // build begin msg
SyncSnapshotSend *pSendMsg = syncSnapshotSendBuild(0, pSender->pSyncNode->vgId); SRpcMsg rpcMsg = {0};
(void)syncBuildSnapshotSend(&rpcMsg, 0, pSender->pSyncNode->vgId);
SyncSnapshotSend *pSendMsg = rpcMsg.pCont;
pSendMsg->srcId = pSender->pSyncNode->myRaftId; pSendMsg->srcId = pSender->pSyncNode->myRaftId;
pSendMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex]; pSendMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
pSendMsg->term = pSender->pSyncNode->pRaftStore->currentTerm; pSendMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
...@@ -816,11 +820,8 @@ int32_t syncNodeOnSnapshotReplyPre(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) ...@@ -816,11 +820,8 @@ int32_t syncNodeOnSnapshotReplyPre(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg)
pSendMsg->seq = SYNC_SNAPSHOT_SEQ_BEGIN; pSendMsg->seq = SYNC_SNAPSHOT_SEQ_BEGIN;
// send msg // send msg
SRpcMsg rpcMsg; syncNodeSendMsgById(&pSendMsg->destId, pSender->pSyncNode, &rpcMsg);
syncSnapshotSend2RpcMsg(pSendMsg, &rpcMsg);
syncNodeSendMsgById(&(pSendMsg->destId), pSender->pSyncNode, &rpcMsg);
syncLogSendSyncSnapshotSend(pSyncNode, pSendMsg, ""); syncLogSendSyncSnapshotSend(pSyncNode, pSendMsg, "");
syncSnapshotSendDestroy(pSendMsg);
return 0; return 0;
} }
......
...@@ -409,6 +409,24 @@ void syncApplyMsgPrint2(char* s, const SyncApplyMsg* pMsg); ...@@ -409,6 +409,24 @@ void syncApplyMsgPrint2(char* s, const SyncApplyMsg* pMsg);
void syncApplyMsgLog(const SyncApplyMsg* pMsg); void syncApplyMsgLog(const SyncApplyMsg* pMsg);
void syncApplyMsgLog2(char* s, const SyncApplyMsg* pMsg); void syncApplyMsgLog2(char* s, const SyncApplyMsg* pMsg);
SyncSnapshotSend* syncSnapshotSendBuild(uint32_t dataLen, int32_t vgId);
void syncSnapshotSendDestroy(SyncSnapshotSend* pMsg);
void syncSnapshotSendSerialize(const SyncSnapshotSend* pMsg, char* buf, uint32_t bufLen);
void syncSnapshotSendDeserialize(const char* buf, uint32_t len, SyncSnapshotSend* pMsg);
char* syncSnapshotSendSerialize2(const SyncSnapshotSend* pMsg, uint32_t* len);
SyncSnapshotSend* syncSnapshotSendDeserialize2(const char* buf, uint32_t len);
void syncSnapshotSend2RpcMsg(const SyncSnapshotSend* pMsg, SRpcMsg* pRpcMsg);
void syncSnapshotSendFromRpcMsg(const SRpcMsg* pRpcMsg, SyncSnapshotSend* pMsg);
SyncSnapshotSend* syncSnapshotSendFromRpcMsg2(const SRpcMsg* pRpcMsg);
cJSON* syncSnapshotSend2Json(const SyncSnapshotSend* pMsg);
char* syncSnapshotSend2Str(const SyncSnapshotSend* pMsg);
// for debug ----------------------
void syncSnapshotSendPrint(const SyncSnapshotSend* pMsg);
void syncSnapshotSendPrint2(char* s, const SyncSnapshotSend* pMsg);
void syncSnapshotSendLog(const SyncSnapshotSend* pMsg);
void syncSnapshotSendLog2(char* s, const SyncSnapshotSend* pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -2392,3 +2392,69 @@ void syncApplyMsgLog2(char* s, const SyncApplyMsg* pMsg) { ...@@ -2392,3 +2392,69 @@ void syncApplyMsgLog2(char* s, const SyncApplyMsg* pMsg) {
taosMemoryFree(serialized); taosMemoryFree(serialized);
} }
} }
// ---------------------------------------------
SyncSnapshotSend* syncSnapshotSendBuild(uint32_t dataLen, int32_t vgId) {
uint32_t bytes = sizeof(SyncSnapshotSend) + dataLen;
SyncSnapshotSend* pMsg = taosMemoryMalloc(bytes);
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
pMsg->vgId = vgId;
pMsg->msgType = TDMT_SYNC_SNAPSHOT_SEND;
pMsg->dataLen = dataLen;
return pMsg;
}
void syncSnapshotSendDestroy(SyncSnapshotSend* pMsg) {
if (pMsg != NULL) {
taosMemoryFree(pMsg);
}
}
void syncSnapshotSendSerialize(const SyncSnapshotSend* pMsg, char* buf, uint32_t bufLen) {
ASSERT(pMsg->bytes <= bufLen);
memcpy(buf, pMsg, pMsg->bytes);
}
void syncSnapshotSendDeserialize(const char* buf, uint32_t len, SyncSnapshotSend* pMsg) {
memcpy(pMsg, buf, len);
ASSERT(len == pMsg->bytes);
ASSERT(pMsg->bytes == sizeof(SyncSnapshotSend) + pMsg->dataLen);
}
char* syncSnapshotSendSerialize2(const SyncSnapshotSend* pMsg, uint32_t* len) {
char* buf = taosMemoryMalloc(pMsg->bytes);
ASSERT(buf != NULL);
syncSnapshotSendSerialize(pMsg, buf, pMsg->bytes);
if (len != NULL) {
*len = pMsg->bytes;
}
return buf;
}
SyncSnapshotSend* syncSnapshotSendDeserialize2(const char* buf, uint32_t len) {
uint32_t bytes = *((uint32_t*)buf);
SyncSnapshotSend* pMsg = taosMemoryMalloc(bytes);
ASSERT(pMsg != NULL);
syncSnapshotSendDeserialize(buf, len, pMsg);
ASSERT(len == pMsg->bytes);
return pMsg;
}
void syncSnapshotSend2RpcMsg(const SyncSnapshotSend* pMsg, SRpcMsg* pRpcMsg) {
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
pRpcMsg->msgType = pMsg->msgType;
pRpcMsg->contLen = pMsg->bytes;
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
syncSnapshotSendSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
}
void syncSnapshotSendFromRpcMsg(const SRpcMsg* pRpcMsg, SyncSnapshotSend* pMsg) {
syncSnapshotSendDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
}
SyncSnapshotSend* syncSnapshotSendFromRpcMsg2(const SRpcMsg* pRpcMsg) {
SyncSnapshotSend* pMsg = syncSnapshotSendDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
ASSERT(pMsg != NULL);
return pMsg;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册