未验证 提交 bd471495 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #18081 from taosdata/fix/TD-20052

enh: adjust request vote msg
...@@ -34,9 +34,7 @@ extern "C" { ...@@ -34,9 +34,7 @@ extern "C" {
// mdest |-> j]) // mdest |-> j])
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>> // /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
int32_t syncNodeElect(SSyncNode* pSyncNode); int32_t syncNodeElect(SSyncNode* pNode);
int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode);
int32_t syncNodeSendRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -324,18 +324,9 @@ int32_t syncNodePeerStateInit(SSyncNode* pSyncNode); ...@@ -324,18 +324,9 @@ int32_t syncNodePeerStateInit(SSyncNode* pSyncNode);
void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s); void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s);
void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s); void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s);
void syncLogSendRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s);
void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s);
void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s);
void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s);
void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s); void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s);
void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s); void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s);
void syncLogSendAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s);
void syncLogRecvAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s);
void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s); void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s);
void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s); void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s);
......
...@@ -28,8 +28,6 @@ typedef enum ESyncTimeoutType { ...@@ -28,8 +28,6 @@ typedef enum ESyncTimeoutType {
SYNC_TIMEOUT_HEARTBEAT, SYNC_TIMEOUT_HEARTBEAT,
} ESyncTimeoutType; } ESyncTimeoutType;
const char* syncTimerTypeStr(enum ESyncTimeoutType timerType);
typedef struct SyncTimeout { typedef struct SyncTimeout {
uint32_t bytes; uint32_t bytes;
int32_t vgId; int32_t vgId;
...@@ -40,9 +38,6 @@ typedef struct SyncTimeout { ...@@ -40,9 +38,6 @@ typedef struct SyncTimeout {
void* data; // need optimized void* data; // need optimized
} SyncTimeout; } SyncTimeout;
int32_t syncTimeoutBuild(SRpcMsg* pTimeoutRpcMsg, ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS,
SSyncNode* pNode);
typedef struct SyncClientRequest { typedef struct SyncClientRequest {
uint32_t bytes; uint32_t bytes;
int32_t vgId; int32_t vgId;
...@@ -54,47 +49,6 @@ typedef struct SyncClientRequest { ...@@ -54,47 +49,6 @@ typedef struct SyncClientRequest {
char data[]; // origin RpcMsg.pCont char data[]; // origin RpcMsg.pCont
} SyncClientRequest; } SyncClientRequest;
int32_t syncClientRequestBuildFromRpcMsg(SRpcMsg* pClientRequestRpcMsg, const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum,
bool isWeak, int32_t vgId);
int32_t syncClientRequestBuildFromNoopEntry(SRpcMsg* pClientRequestRpcMsg, const SSyncRaftEntry* pEntry, int32_t vgId);
// ---------------------------------------------
typedef struct SRaftMeta {
uint64_t seqNum;
bool isWeak;
} SRaftMeta;
// block1:
// block2: SRaftMeta array
// block3: rpc msg array (with pCont pointer)
typedef struct SyncClientRequestBatch {
uint32_t bytes;
int32_t vgId;
uint32_t msgType; // TDMT_SYNC_CLIENT_REQUEST_BATCH
uint32_t dataCount;
uint32_t dataLen;
char data[]; // block2, block3
} SyncClientRequestBatch;
SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg** rpcMsgPArr, SRaftMeta* raftArr, int32_t arrSize,
int32_t vgId);
void syncClientRequestBatch2RpcMsg(const SyncClientRequestBatch* pSyncMsg, SRpcMsg* pRpcMsg);
void syncClientRequestBatchDestroy(SyncClientRequestBatch* pMsg);
void syncClientRequestBatchDestroyDeep(SyncClientRequestBatch* pMsg);
SRaftMeta* syncClientRequestBatchMetaArr(const SyncClientRequestBatch* pSyncMsg);
SRpcMsg* syncClientRequestBatchRpcMsgArr(const SyncClientRequestBatch* pSyncMsg);
SyncClientRequestBatch* syncClientRequestBatchFromRpcMsg(const SRpcMsg* pRpcMsg);
cJSON* syncClientRequestBatch2Json(const SyncClientRequestBatch* pMsg);
char* syncClientRequestBatch2Str(const SyncClientRequestBatch* pMsg);
// for debug ----------------------
void syncClientRequestBatchPrint(const SyncClientRequestBatch* pMsg);
void syncClientRequestBatchPrint2(char* s, const SyncClientRequestBatch* pMsg);
void syncClientRequestBatchLog(const SyncClientRequestBatch* pMsg);
void syncClientRequestBatchLog2(char* s, const SyncClientRequestBatch* pMsg);
// ---------------------------------------------
typedef struct SyncClientRequestReply { typedef struct SyncClientRequestReply {
uint32_t bytes; uint32_t bytes;
int32_t vgId; int32_t vgId;
...@@ -103,7 +57,6 @@ typedef struct SyncClientRequestReply { ...@@ -103,7 +57,6 @@ typedef struct SyncClientRequestReply {
SRaftId leaderHint; SRaftId leaderHint;
} SyncClientRequestReply; } SyncClientRequestReply;
// ---------------------------------------------
typedef struct SyncRequestVote { typedef struct SyncRequestVote {
uint32_t bytes; uint32_t bytes;
int32_t vgId; int32_t vgId;
...@@ -116,25 +69,6 @@ typedef struct SyncRequestVote { ...@@ -116,25 +69,6 @@ typedef struct SyncRequestVote {
SyncTerm lastLogTerm; SyncTerm lastLogTerm;
} SyncRequestVote; } SyncRequestVote;
SyncRequestVote* syncRequestVoteBuild(int32_t vgId);
void syncRequestVoteDestroy(SyncRequestVote* pMsg);
void syncRequestVoteSerialize(const SyncRequestVote* pMsg, char* buf, uint32_t bufLen);
void syncRequestVoteDeserialize(const char* buf, uint32_t len, SyncRequestVote* pMsg);
char* syncRequestVoteSerialize2(const SyncRequestVote* pMsg, uint32_t* len);
SyncRequestVote* syncRequestVoteDeserialize2(const char* buf, uint32_t len);
void syncRequestVote2RpcMsg(const SyncRequestVote* pMsg, SRpcMsg* pRpcMsg);
void syncRequestVoteFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVote* pMsg);
SyncRequestVote* syncRequestVoteFromRpcMsg2(const SRpcMsg* pRpcMsg);
cJSON* syncRequestVote2Json(const SyncRequestVote* pMsg);
char* syncRequestVote2Str(const SyncRequestVote* pMsg);
// for debug ----------------------
void syncRequestVotePrint(const SyncRequestVote* pMsg);
void syncRequestVotePrint2(char* s, const SyncRequestVote* pMsg);
void syncRequestVoteLog(const SyncRequestVote* pMsg);
void syncRequestVoteLog2(char* s, const SyncRequestVote* pMsg);
// ---------------------------------------------
typedef struct SyncRequestVoteReply { typedef struct SyncRequestVoteReply {
uint32_t bytes; uint32_t bytes;
int32_t vgId; int32_t vgId;
...@@ -146,27 +80,6 @@ typedef struct SyncRequestVoteReply { ...@@ -146,27 +80,6 @@ typedef struct SyncRequestVoteReply {
bool voteGranted; bool voteGranted;
} SyncRequestVoteReply; } SyncRequestVoteReply;
SyncRequestVoteReply* syncRequestVoteReplyBuild(int32_t vgId);
void syncRequestVoteReplyDestroy(SyncRequestVoteReply* pMsg);
void syncRequestVoteReplySerialize(const SyncRequestVoteReply* pMsg, char* buf, uint32_t bufLen);
void syncRequestVoteReplyDeserialize(const char* buf, uint32_t len, SyncRequestVoteReply* pMsg);
char* syncRequestVoteReplySerialize2(const SyncRequestVoteReply* pMsg, uint32_t* len);
SyncRequestVoteReply* syncRequestVoteReplyDeserialize2(const char* buf, uint32_t len);
void syncRequestVoteReply2RpcMsg(const SyncRequestVoteReply* pMsg, SRpcMsg* pRpcMsg);
void syncRequestVoteReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVoteReply* pMsg);
SyncRequestVoteReply* syncRequestVoteReplyFromRpcMsg2(const SRpcMsg* pRpcMsg);
cJSON* syncRequestVoteReply2Json(const SyncRequestVoteReply* pMsg);
char* syncRequestVoteReply2Str(const SyncRequestVoteReply* pMsg);
// for debug ----------------------
void syncRequestVoteReplyPrint(const SyncRequestVoteReply* pMsg);
void syncRequestVoteReplyPrint2(char* s, const SyncRequestVoteReply* pMsg);
void syncRequestVoteReplyLog(const SyncRequestVoteReply* pMsg);
void syncRequestVoteReplyLog2(char* s, const SyncRequestVoteReply* pMsg);
// ---------------------------------------------
// data: entry
typedef struct SyncAppendEntries { typedef struct SyncAppendEntries {
uint32_t bytes; uint32_t bytes;
int32_t vgId; int32_t vgId;
...@@ -202,47 +115,6 @@ void syncAppendEntriesPrint2(char* s, const SyncAppendEntries* pMsg); ...@@ -202,47 +115,6 @@ void syncAppendEntriesPrint2(char* s, const SyncAppendEntries* pMsg);
void syncAppendEntriesLog(const SyncAppendEntries* pMsg); void syncAppendEntriesLog(const SyncAppendEntries* pMsg);
void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg); void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg);
// ---------------------------------------------
typedef struct SOffsetAndContLen {
int32_t offset;
int32_t contLen;
} SOffsetAndContLen;
// data:
// block1: SOffsetAndContLen Array
// block2: entry Array
typedef struct SyncAppendEntriesBatch {
uint32_t bytes;
int32_t vgId;
uint32_t msgType;
SRaftId srcId;
SRaftId destId;
// private data
SyncTerm term;
SyncIndex prevLogIndex;
SyncTerm prevLogTerm;
SyncIndex commitIndex;
SyncTerm privateTerm;
int32_t dataCount;
uint32_t dataLen;
char data[]; // block1, block2
} SyncAppendEntriesBatch;
SyncAppendEntriesBatch* syncAppendEntriesBatchBuild(SSyncRaftEntry** entryPArr, int32_t arrSize, int32_t vgId);
SOffsetAndContLen* syncAppendEntriesBatchMetaTableArray(SyncAppendEntriesBatch* pMsg);
void syncAppendEntriesBatchDestroy(SyncAppendEntriesBatch* pMsg);
void syncAppendEntriesBatchSerialize(const SyncAppendEntriesBatch* pMsg, char* buf, uint32_t bufLen);
void syncAppendEntriesBatchDeserialize(const char* buf, uint32_t len, SyncAppendEntriesBatch* pMsg);
char* syncAppendEntriesBatchSerialize2(const SyncAppendEntriesBatch* pMsg, uint32_t* len);
SyncAppendEntriesBatch* syncAppendEntriesBatchDeserialize2(const char* buf, uint32_t len);
void syncAppendEntriesBatch2RpcMsg(const SyncAppendEntriesBatch* pMsg, SRpcMsg* pRpcMsg);
void syncAppendEntriesBatchFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntriesBatch* pMsg);
SyncAppendEntriesBatch* syncAppendEntriesBatchFromRpcMsg2(const SRpcMsg* pRpcMsg);
// ---------------------------------------------
typedef struct SyncAppendEntriesReply { typedef struct SyncAppendEntriesReply {
uint32_t bytes; uint32_t bytes;
int32_t vgId; int32_t vgId;
...@@ -573,9 +445,8 @@ void syncLocalCmdLog(const SyncLocalCmd* pMsg); ...@@ -573,9 +445,8 @@ void syncLocalCmdLog(const SyncLocalCmd* pMsg);
void syncLocalCmdLog2(char* s, const SyncLocalCmd* pMsg); void syncLocalCmdLog2(char* s, const SyncLocalCmd* pMsg);
// on message ---------------------- // on message ----------------------
int32_t syncNodeOnRequestVote(SSyncNode* pNode, const SRpcMsg* pMsg);
int32_t syncNodeOnRequestVote(SSyncNode* ths, SyncRequestVote* pMsg); int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, SyncRequestVoteReply* pMsg);
int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg); int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg);
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMsg); int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
...@@ -598,6 +469,16 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, SyncLocalCmd* pMsg); ...@@ -598,6 +469,16 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, SyncLocalCmd* pMsg);
bool syncNodeSnapshotEnable(SSyncNode* pSyncNode); bool syncNodeSnapshotEnable(SSyncNode* pSyncNode);
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode); ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
const char* syncTimerTypeStr(enum ESyncTimeoutType timerType);
int32_t syncTimeoutBuild(SRpcMsg* pTimeoutRpcMsg, ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS,
SSyncNode* pNode);
int32_t syncClientRequestBuildFromRpcMsg(SRpcMsg* pClientRequestRpcMsg, const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum,
bool isWeak, int32_t vgId);
int32_t syncClientRequestBuildFromNoopEntry(SRpcMsg* pClientRequestRpcMsg, const SSyncRaftEntry* pEntry, int32_t vgId);
int32_t syncBuildRequestVote(SRpcMsg* pMsg, int32_t vgId);
int32_t syncBuildRequestVoteReply(SRpcMsg* pMsg, int32_t vgId);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -44,7 +44,7 @@ extern "C" { ...@@ -44,7 +44,7 @@ extern "C" {
// m) // m)
// /\ UNCHANGED <<state, currentTerm, candidateVars, leaderVars, logVars>> // /\ UNCHANGED <<state, currentTerm, candidateVars, leaderVars, logVars>>
// //
int32_t syncNodeOnRequestVote(SSyncNode* ths, SyncRequestVote* pMsg); int32_t syncNodeOnRequestVote(SSyncNode* pNode, const SRpcMsg* pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -39,7 +39,7 @@ extern "C" { ...@@ -39,7 +39,7 @@ extern "C" {
// /\ Discard(m) // /\ Discard(m)
// /\ UNCHANGED <<serverVars, votedFor, leaderVars, logVars>> // /\ UNCHANGED <<serverVars, votedFor, leaderVars, logVars>>
// //
int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, SyncRequestVoteReply* pMsg); int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, const SRpcMsg* pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -33,6 +33,41 @@ ...@@ -33,6 +33,41 @@
// mdest |-> j]) // mdest |-> j])
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>> // /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
static void syncLogSendRequestVote(SSyncNode* pNode, const SyncRequestVote* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pNode, "send sync-request-vote to %s:%d {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s", host,
port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s);
}
static int32_t syncNodeRequestVotePeers(SSyncNode* pNode) {
if (pNode->state != TAOS_SYNC_STATE_CANDIDATE) {
sNTrace(pNode, "not candidate, stop elect");
return 0;
}
int32_t ret = 0;
for (int i = 0; i < pNode->peersNum; ++i) {
SRpcMsg rpcMsg = {0};
ret = syncBuildRequestVote(&rpcMsg, pNode->vgId);
ASSERT(ret == 0);
SyncRequestVote* pMsg = rpcMsg.pCont;
pMsg->srcId = pNode->myRaftId;
pMsg->destId = pNode->peersId[i];
pMsg->term = pNode->pRaftStore->currentTerm;
ret = syncNodeGetLastIndexTerm(pNode, &pMsg->lastLogIndex, &pMsg->lastLogTerm);
ASSERT(ret == 0);
ret = syncNodeSendMsgById(&pNode->peersId[i], pNode, &rpcMsg);
ASSERT(ret == 0);
}
return ret;
}
int32_t syncNodeElect(SSyncNode* pSyncNode) { int32_t syncNodeElect(SSyncNode* pSyncNode) {
sNTrace(pSyncNode, "begin election"); sNTrace(pSyncNode, "begin election");
...@@ -81,36 +116,3 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) { ...@@ -81,36 +116,3 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) {
return ret; return ret;
} }
int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode) {
if (pSyncNode->state != TAOS_SYNC_STATE_CANDIDATE) {
sNTrace(pSyncNode, "not candidate, stop elect");
return 0;
}
int32_t ret = 0;
for (int i = 0; i < pSyncNode->peersNum; ++i) {
SyncRequestVote* pMsg = syncRequestVoteBuild(pSyncNode->vgId);
pMsg->srcId = pSyncNode->myRaftId;
pMsg->destId = pSyncNode->peersId[i];
pMsg->term = pSyncNode->pRaftStore->currentTerm;
ret = syncNodeGetLastIndexTerm(pSyncNode, &(pMsg->lastLogIndex), &(pMsg->lastLogTerm));
ASSERT(ret == 0);
ret = syncNodeSendRequestVote(pSyncNode, &pSyncNode->peersId[i], pMsg);
ASSERT(ret == 0);
syncRequestVoteDestroy(pMsg);
}
return ret;
}
int32_t syncNodeSendRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg) {
int32_t ret = 0;
syncLogSendRequestVote(pSyncNode, pMsg, "");
SRpcMsg rpcMsg;
syncRequestVote2RpcMsg(pMsg, &rpcMsg);
syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg);
return ret;
}
\ No newline at end of file
...@@ -146,13 +146,9 @@ int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) { ...@@ -146,13 +146,9 @@ int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
} else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL); code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL);
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
SyncRequestVote* pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg); syncNodeOnRequestVote(pSyncNode, pMsg);
code = syncNodeOnRequestVote(pSyncNode, pSyncMsg);
syncRequestVoteDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) { } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
SyncRequestVoteReply* pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg); code = syncNodeOnRequestVoteReply(pSyncNode, pMsg);
code = syncNodeOnRequestVoteReply(pSyncNode, pSyncMsg);
syncRequestVoteReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) { } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
SyncAppendEntries* pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg); SyncAppendEntries* pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
code = syncNodeOnAppendEntries(pSyncNode, pSyncMsg); code = syncNodeOnAppendEntries(pSyncNode, pSyncMsg);
...@@ -1654,9 +1650,13 @@ void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) ...@@ -1654,9 +1650,13 @@ void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId)
// simulate get vote from outside // simulate get vote from outside
void syncNodeVoteForSelf(SSyncNode* pSyncNode) { void syncNodeVoteForSelf(SSyncNode* pSyncNode) {
syncNodeVoteForTerm(pSyncNode, pSyncNode->pRaftStore->currentTerm, &(pSyncNode->myRaftId)); syncNodeVoteForTerm(pSyncNode, pSyncNode->pRaftStore->currentTerm, &pSyncNode->myRaftId);
SyncRequestVoteReply* pMsg = syncRequestVoteReplyBuild(pSyncNode->vgId); SRpcMsg rpcMsg = {0};
int32_t ret = syncBuildRequestVoteReply(&rpcMsg, pSyncNode->vgId);
if (ret != 0) return;
SyncRequestVoteReply* pMsg = rpcMsg.pCont;
pMsg->srcId = pSyncNode->myRaftId; pMsg->srcId = pSyncNode->myRaftId;
pMsg->destId = pSyncNode->myRaftId; pMsg->destId = pSyncNode->myRaftId;
pMsg->term = pSyncNode->pRaftStore->currentTerm; pMsg->term = pSyncNode->pRaftStore->currentTerm;
...@@ -1664,11 +1664,9 @@ void syncNodeVoteForSelf(SSyncNode* pSyncNode) { ...@@ -1664,11 +1664,9 @@ void syncNodeVoteForSelf(SSyncNode* pSyncNode) {
voteGrantedVote(pSyncNode->pVotesGranted, pMsg); voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
votesRespondAdd(pSyncNode->pVotesRespond, pMsg); votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
syncRequestVoteReplyDestroy(pMsg); rpcFreeCont(rpcMsg.pCont);
} }
// snapshot --------------
// return if has a snapshot // return if has a snapshot
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) { bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
bool ret = false; bool ret = false;
...@@ -2535,39 +2533,6 @@ void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* ...@@ -2535,39 +2533,6 @@ void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char*
syncTimerTypeStr(pMsg->timeoutType), pMsg->logicClock, pMsg->timerMS, pMsg->data, s); syncTimerTypeStr(pMsg->timeoutType), pMsg->logicClock, pMsg->timerMS, pMsg->data, s);
} }
void syncLogSendRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "send sync-request-vote to %s:%d {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s",
host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s);
}
void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) {
char logBuf[256];
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "recv sync-request-vote from %s:%d, {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s",
host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s);
}
void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "send sync-request-vote-reply to %s:%d {term:%" PRId64 ", grant:%d}, %s", host, port, pMsg->term,
pMsg->voteGranted, s);
}
void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "recv sync-request-vote-reply from %s:%d {term:%" PRId64 ", grant:%d}, %s", host, port, pMsg->term,
pMsg->voteGranted, s);
}
void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) { void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
char host[64]; char host[64];
uint16_t port; uint16_t port;
...@@ -2591,30 +2556,6 @@ void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMs ...@@ -2591,30 +2556,6 @@ void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMs
pMsg->dataLen, s); pMsg->dataLen, s);
} }
void syncLogSendAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"send sync-append-entries-batch to %s:%d, {term:%" PRId64 ", pre-index:%" PRId64 ", pre-term:%" PRId64
", pterm:%" PRId64 ", cmt:%" PRId64 ", datalen:%d, count:%d}, %s",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex,
pMsg->dataLen, pMsg->dataCount, s);
}
void syncLogRecvAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"recv sync-append-entries-batch from %s:%d, {term:%" PRId64 ", pre-index:%" PRId64 ", pre-term:%" PRId64
", pterm:%" PRId64 ", cmt:%" PRId64 ", datalen:%d, count:%d}, %s",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex,
pMsg->dataLen, pMsg->dataCount, s);
}
void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) { void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
char host[64]; char host[64];
uint16_t port; uint16_t port;
......
...@@ -88,393 +88,38 @@ int32_t syncClientRequestBuildFromNoopEntry(SRpcMsg* pClientRequestRpcMsg, const ...@@ -88,393 +88,38 @@ int32_t syncClientRequestBuildFromNoopEntry(SRpcMsg* pClientRequestRpcMsg, const
return 0; return 0;
} }
// ---- message process SyncClientRequestBatch---- int32_t syncBuildRequestVote(SRpcMsg* pMsg, int32_t vgId) {
int32_t bytes = sizeof(SyncRequestVote);
// block1: pMsg->pCont = rpcMallocCont(bytes);
// block2: SRaftMeta array
// block3: rpc msg array (with pCont)
SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg** rpcMsgPArr, SRaftMeta* raftArr, int32_t arrSize,
int32_t vgId) {
ASSERT(rpcMsgPArr != NULL);
ASSERT(arrSize > 0);
int32_t dataLen = 0;
int32_t raftMetaArrayLen = sizeof(SRaftMeta) * arrSize;
int32_t rpcArrayLen = sizeof(SRpcMsg) * arrSize;
dataLen += (raftMetaArrayLen + rpcArrayLen);
uint32_t bytes = sizeof(SyncClientRequestBatch) + dataLen;
SyncClientRequestBatch* pMsg = taosMemoryMalloc(bytes);
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
pMsg->vgId = vgId;
pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST_BATCH;
pMsg->dataCount = arrSize;
pMsg->dataLen = dataLen;
SRaftMeta* raftMetaArr = (SRaftMeta*)(pMsg->data);
SRpcMsg* msgArr = (SRpcMsg*)((char*)(pMsg->data) + raftMetaArrayLen);
for (int i = 0; i < arrSize; ++i) {
// init raftMetaArr
raftMetaArr[i].isWeak = raftArr[i].isWeak;
raftMetaArr[i].seqNum = raftArr[i].seqNum;
// init msgArr
msgArr[i] = *(rpcMsgPArr[i]);
}
return pMsg;
}
void syncClientRequestBatch2RpcMsg(const SyncClientRequestBatch* pSyncMsg, SRpcMsg* pRpcMsg) {
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
pRpcMsg->msgType = pSyncMsg->msgType;
pRpcMsg->contLen = pSyncMsg->bytes;
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
memcpy(pRpcMsg->pCont, pSyncMsg, pRpcMsg->contLen);
}
void syncClientRequestBatchDestroy(SyncClientRequestBatch* pMsg) {
if (pMsg != NULL) {
taosMemoryFree(pMsg);
}
}
void syncClientRequestBatchDestroyDeep(SyncClientRequestBatch* pMsg) {
if (pMsg != NULL) {
int32_t arrSize = pMsg->dataCount;
int32_t raftMetaArrayLen = sizeof(SRaftMeta) * arrSize;
SRpcMsg* msgArr = (SRpcMsg*)((char*)(pMsg->data) + raftMetaArrayLen);
for (int i = 0; i < arrSize; ++i) {
if (msgArr[i].pCont != NULL) {
rpcFreeCont(msgArr[i].pCont);
}
}
taosMemoryFree(pMsg);
}
}
SRaftMeta* syncClientRequestBatchMetaArr(const SyncClientRequestBatch* pSyncMsg) {
SRaftMeta* raftMetaArr = (SRaftMeta*)(pSyncMsg->data);
return raftMetaArr;
}
SRpcMsg* syncClientRequestBatchRpcMsgArr(const SyncClientRequestBatch* pSyncMsg) {
int32_t arrSize = pSyncMsg->dataCount;
int32_t raftMetaArrayLen = sizeof(SRaftMeta) * arrSize;
SRpcMsg* msgArr = (SRpcMsg*)((char*)(pSyncMsg->data) + raftMetaArrayLen);
return msgArr;
}
SyncClientRequestBatch* syncClientRequestBatchFromRpcMsg(const SRpcMsg* pRpcMsg) {
SyncClientRequestBatch* pSyncMsg = taosMemoryMalloc(pRpcMsg->contLen);
ASSERT(pSyncMsg != NULL);
memcpy(pSyncMsg, pRpcMsg->pCont, pRpcMsg->contLen);
ASSERT(pRpcMsg->contLen == pSyncMsg->bytes);
return pSyncMsg;
}
// ---- message process SyncRequestVote----
SyncRequestVote* syncRequestVoteBuild(int32_t vgId) {
uint32_t bytes = sizeof(SyncRequestVote);
SyncRequestVote* pMsg = taosMemoryMalloc(bytes);
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
pMsg->vgId = vgId;
pMsg->msgType = TDMT_SYNC_REQUEST_VOTE; pMsg->msgType = TDMT_SYNC_REQUEST_VOTE;
return pMsg; pMsg->contLen = bytes;
} if (pMsg->pCont == NULL) {
terrno = TDMT_SYNC_REQUEST_VOTE;
void syncRequestVoteDestroy(SyncRequestVote* pMsg) { return -1;
if (pMsg != NULL) {
taosMemoryFree(pMsg);
}
}
void syncRequestVoteSerialize(const SyncRequestVote* pMsg, char* buf, uint32_t bufLen) {
ASSERT(pMsg->bytes <= bufLen);
memcpy(buf, pMsg, pMsg->bytes);
}
void syncRequestVoteDeserialize(const char* buf, uint32_t len, SyncRequestVote* pMsg) {
memcpy(pMsg, buf, len);
ASSERT(len == pMsg->bytes);
}
char* syncRequestVoteSerialize2(const SyncRequestVote* pMsg, uint32_t* len) {
char* buf = taosMemoryMalloc(pMsg->bytes);
ASSERT(buf != NULL);
syncRequestVoteSerialize(pMsg, buf, pMsg->bytes);
if (len != NULL) {
*len = pMsg->bytes;
}
return buf;
}
SyncRequestVote* syncRequestVoteDeserialize2(const char* buf, uint32_t len) {
uint32_t bytes = *((uint32_t*)buf);
SyncRequestVote* pMsg = taosMemoryMalloc(bytes);
ASSERT(pMsg != NULL);
syncRequestVoteDeserialize(buf, len, pMsg);
ASSERT(len == pMsg->bytes);
return pMsg;
}
void syncRequestVote2RpcMsg(const SyncRequestVote* pMsg, SRpcMsg* pRpcMsg) {
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
pRpcMsg->msgType = pMsg->msgType;
pRpcMsg->contLen = pMsg->bytes;
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
syncRequestVoteSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
}
void syncRequestVoteFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVote* pMsg) {
syncRequestVoteDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
}
SyncRequestVote* syncRequestVoteFromRpcMsg2(const SRpcMsg* pRpcMsg) {
SyncRequestVote* pMsg = syncRequestVoteDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
ASSERT(pMsg != NULL);
return pMsg;
}
cJSON* syncRequestVote2Json(const SyncRequestVote* pMsg) {
char u64buf[128] = {0};
cJSON* pRoot = cJSON_CreateObject();
if (pMsg != NULL) {
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
cJSON* pSrcId = cJSON_CreateObject();
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr);
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
{
uint64_t u64 = pMsg->srcId.addr;
cJSON* pTmp = pSrcId;
char host[128] = {0};
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pTmp, "addr_host", host);
cJSON_AddNumberToObject(pTmp, "addr_port", port);
}
cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
cJSON* pDestId = cJSON_CreateObject();
cJSON_AddNumberToObject(pDestId, "addr", pMsg->destId.addr);
{
uint64_t u64 = pMsg->destId.addr;
cJSON* pTmp = pDestId;
char host[128] = {0};
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pTmp, "addr_host", host);
cJSON_AddNumberToObject(pTmp, "addr_port", port);
}
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
cJSON_AddItemToObject(pRoot, "destId", pDestId);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term);
cJSON_AddStringToObject(pRoot, "term", u64buf);
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->lastLogIndex);
cJSON_AddStringToObject(pRoot, "lastLogIndex", u64buf);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->lastLogTerm);
cJSON_AddStringToObject(pRoot, "lastLogTerm", u64buf);
} }
cJSON* pJson = cJSON_CreateObject(); SyncRequestVote* pRequestVote = pMsg->pCont;
cJSON_AddItemToObject(pJson, "SyncRequestVote", pRoot); pRequestVote->bytes = bytes;
return pJson; pRequestVote->msgType = TDMT_SYNC_REQUEST_VOTE;
} pRequestVote->vgId = vgId;
return 0;
char* syncRequestVote2Str(const SyncRequestVote* pMsg) {
cJSON* pJson = syncRequestVote2Json(pMsg);
char* serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
// for debug ----------------------
void syncRequestVotePrint(const SyncRequestVote* pMsg) {
char* serialized = syncRequestVote2Str(pMsg);
printf("syncRequestVotePrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncRequestVotePrint2(char* s, const SyncRequestVote* pMsg) {
char* serialized = syncRequestVote2Str(pMsg);
printf("syncRequestVotePrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncRequestVoteLog(const SyncRequestVote* pMsg) {
char* serialized = syncRequestVote2Str(pMsg);
sTrace("syncRequestVoteLog | len:%d | %s", (int32_t)strlen(serialized), serialized);
taosMemoryFree(serialized);
}
void syncRequestVoteLog2(char* s, const SyncRequestVote* pMsg) {
if (gRaftDetailLog) {
char* serialized = syncRequestVote2Str(pMsg);
sTrace("syncRequestVoteLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
taosMemoryFree(serialized);
}
} }
// ---- message process SyncRequestVoteReply---- int32_t syncBuildRequestVoteReply(SRpcMsg* pMsg, int32_t vgId) {
SyncRequestVoteReply* syncRequestVoteReplyBuild(int32_t vgId) { int32_t bytes = sizeof(SyncRequestVoteReply);
uint32_t bytes = sizeof(SyncRequestVoteReply); pMsg->pCont = rpcMallocCont(bytes);
SyncRequestVoteReply* pMsg = taosMemoryMalloc(bytes);
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
pMsg->vgId = vgId;
pMsg->msgType = TDMT_SYNC_REQUEST_VOTE_REPLY; pMsg->msgType = TDMT_SYNC_REQUEST_VOTE_REPLY;
return pMsg; pMsg->contLen = bytes;
} if (pMsg->pCont == NULL) {
terrno = TDMT_SYNC_REQUEST_VOTE;
void syncRequestVoteReplyDestroy(SyncRequestVoteReply* pMsg) { return -1;
if (pMsg != NULL) {
taosMemoryFree(pMsg);
}
}
void syncRequestVoteReplySerialize(const SyncRequestVoteReply* pMsg, char* buf, uint32_t bufLen) {
ASSERT(pMsg->bytes <= bufLen);
memcpy(buf, pMsg, pMsg->bytes);
}
void syncRequestVoteReplyDeserialize(const char* buf, uint32_t len, SyncRequestVoteReply* pMsg) {
memcpy(pMsg, buf, len);
ASSERT(len == pMsg->bytes);
}
char* syncRequestVoteReplySerialize2(const SyncRequestVoteReply* pMsg, uint32_t* len) {
char* buf = taosMemoryMalloc(pMsg->bytes);
ASSERT(buf != NULL);
syncRequestVoteReplySerialize(pMsg, buf, pMsg->bytes);
if (len != NULL) {
*len = pMsg->bytes;
}
return buf;
}
SyncRequestVoteReply* syncRequestVoteReplyDeserialize2(const char* buf, uint32_t len) {
uint32_t bytes = *((uint32_t*)buf);
SyncRequestVoteReply* pMsg = taosMemoryMalloc(bytes);
ASSERT(pMsg != NULL);
syncRequestVoteReplyDeserialize(buf, len, pMsg);
ASSERT(len == pMsg->bytes);
return pMsg;
}
void syncRequestVoteReply2RpcMsg(const SyncRequestVoteReply* pMsg, SRpcMsg* pRpcMsg) {
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
pRpcMsg->msgType = pMsg->msgType;
pRpcMsg->contLen = pMsg->bytes;
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
syncRequestVoteReplySerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
}
void syncRequestVoteReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVoteReply* pMsg) {
syncRequestVoteReplyDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
}
SyncRequestVoteReply* syncRequestVoteReplyFromRpcMsg2(const SRpcMsg* pRpcMsg) {
SyncRequestVoteReply* pMsg = syncRequestVoteReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
ASSERT(pMsg != NULL);
return pMsg;
}
cJSON* syncRequestVoteReply2Json(const SyncRequestVoteReply* pMsg) {
char u64buf[128] = {0};
cJSON* pRoot = cJSON_CreateObject();
if (pMsg != NULL) {
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
cJSON* pSrcId = cJSON_CreateObject();
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr);
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
{
uint64_t u64 = pMsg->srcId.addr;
cJSON* pTmp = pSrcId;
char host[128] = {0};
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pTmp, "addr_host", host);
cJSON_AddNumberToObject(pTmp, "addr_port", port);
}
cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
cJSON* pDestId = cJSON_CreateObject();
cJSON_AddNumberToObject(pDestId, "addr", pMsg->destId.addr);
{
uint64_t u64 = pMsg->destId.addr;
cJSON* pTmp = pDestId;
char host[128] = {0};
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pTmp, "addr_host", host);
cJSON_AddNumberToObject(pTmp, "addr_port", port);
}
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
cJSON_AddItemToObject(pRoot, "destId", pDestId);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term);
cJSON_AddStringToObject(pRoot, "term", u64buf);
cJSON_AddNumberToObject(pRoot, "vote_granted", pMsg->voteGranted);
} }
cJSON* pJson = cJSON_CreateObject(); SyncRequestVoteReply* pRequestVoteReply = pMsg->pCont;
cJSON_AddItemToObject(pJson, "SyncRequestVoteReply", pRoot); pRequestVoteReply->bytes = bytes;
return pJson; pRequestVoteReply->msgType = TDMT_SYNC_REQUEST_VOTE_REPLY;
} pRequestVoteReply->vgId = vgId;
return 0;
char* syncRequestVoteReply2Str(const SyncRequestVoteReply* pMsg) {
cJSON* pJson = syncRequestVoteReply2Json(pMsg);
char* serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
// for debug ----------------------
void syncRequestVoteReplyPrint(const SyncRequestVoteReply* pMsg) {
char* serialized = syncRequestVoteReply2Str(pMsg);
printf("syncRequestVoteReplyPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncRequestVoteReplyPrint2(char* s, const SyncRequestVoteReply* pMsg) {
char* serialized = syncRequestVoteReply2Str(pMsg);
printf("syncRequestVoteReplyPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncRequestVoteReplyLog(const SyncRequestVoteReply* pMsg) {
char* serialized = syncRequestVoteReply2Str(pMsg);
sTrace("syncRequestVoteReplyLog | len:%d | %s", (int32_t)strlen(serialized), serialized);
taosMemoryFree(serialized);
}
void syncRequestVoteReplyLog2(char* s, const SyncRequestVoteReply* pMsg) {
if (gRaftDetailLog) {
char* serialized = syncRequestVoteReply2Str(pMsg);
sTrace("syncRequestVoteReplyLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
taosMemoryFree(serialized);
}
} }
// ---- message process SyncAppendEntries---- // ---- message process SyncAppendEntries----
...@@ -648,113 +293,6 @@ void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg) { ...@@ -648,113 +293,6 @@ void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg) {
} }
} }
// ---- message process SyncAppendEntriesBatch----
// block1: SOffsetAndContLen
// block2: SOffsetAndContLen Array
// block3: entry Array
SyncAppendEntriesBatch* syncAppendEntriesBatchBuild(SSyncRaftEntry** entryPArr, int32_t arrSize, int32_t vgId) {
ASSERT(entryPArr != NULL);
ASSERT(arrSize >= 0);
int32_t dataLen = 0;
int32_t metaArrayLen = sizeof(SOffsetAndContLen) * arrSize; // <offset, contLen>
int32_t entryArrayLen = 0;
for (int i = 0; i < arrSize; ++i) { // SRpcMsg pCont
SSyncRaftEntry* pEntry = entryPArr[i];
entryArrayLen += pEntry->bytes;
}
dataLen += (metaArrayLen + entryArrayLen);
uint32_t bytes = sizeof(SyncAppendEntriesBatch) + dataLen;
SyncAppendEntriesBatch* pMsg = taosMemoryMalloc(bytes);
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
pMsg->vgId = vgId;
pMsg->msgType = TDMT_SYNC_APPEND_ENTRIES_BATCH;
pMsg->dataCount = arrSize;
pMsg->dataLen = dataLen;
SOffsetAndContLen* metaArr = (SOffsetAndContLen*)(pMsg->data);
char* pData = pMsg->data;
for (int i = 0; i < arrSize; ++i) {
// init meta <offset, contLen>
if (i == 0) {
metaArr[i].offset = metaArrayLen;
metaArr[i].contLen = entryPArr[i]->bytes;
} else {
metaArr[i].offset = metaArr[i - 1].offset + metaArr[i - 1].contLen;
metaArr[i].contLen = entryPArr[i]->bytes;
}
// init entry array
ASSERT(metaArr[i].contLen == entryPArr[i]->bytes);
memcpy(pData + metaArr[i].offset, entryPArr[i], metaArr[i].contLen);
}
return pMsg;
}
SOffsetAndContLen* syncAppendEntriesBatchMetaTableArray(SyncAppendEntriesBatch* pMsg) {
return (SOffsetAndContLen*)(pMsg->data);
}
void syncAppendEntriesBatchDestroy(SyncAppendEntriesBatch* pMsg) {
if (pMsg != NULL) {
taosMemoryFree(pMsg);
}
}
void syncAppendEntriesBatchSerialize(const SyncAppendEntriesBatch* pMsg, char* buf, uint32_t bufLen) {
ASSERT(pMsg->bytes <= bufLen);
memcpy(buf, pMsg, pMsg->bytes);
}
void syncAppendEntriesBatchDeserialize(const char* buf, uint32_t len, SyncAppendEntriesBatch* pMsg) {
memcpy(pMsg, buf, len);
ASSERT(len == pMsg->bytes);
ASSERT(pMsg->bytes == sizeof(SyncAppendEntriesBatch) + pMsg->dataLen);
}
char* syncAppendEntriesBatchSerialize2(const SyncAppendEntriesBatch* pMsg, uint32_t* len) {
char* buf = taosMemoryMalloc(pMsg->bytes);
ASSERT(buf != NULL);
syncAppendEntriesBatchSerialize(pMsg, buf, pMsg->bytes);
if (len != NULL) {
*len = pMsg->bytes;
}
return buf;
}
SyncAppendEntriesBatch* syncAppendEntriesBatchDeserialize2(const char* buf, uint32_t len) {
uint32_t bytes = *((uint32_t*)buf);
SyncAppendEntriesBatch* pMsg = taosMemoryMalloc(bytes);
ASSERT(pMsg != NULL);
syncAppendEntriesBatchDeserialize(buf, len, pMsg);
ASSERT(len == pMsg->bytes);
return pMsg;
}
void syncAppendEntriesBatch2RpcMsg(const SyncAppendEntriesBatch* pMsg, SRpcMsg* pRpcMsg) {
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
pRpcMsg->msgType = pMsg->msgType;
pRpcMsg->contLen = pMsg->bytes;
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
syncAppendEntriesBatchSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
}
void syncAppendEntriesBatchFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntriesBatch* pMsg) {
syncAppendEntriesBatchDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
}
SyncAppendEntriesBatch* syncAppendEntriesBatchFromRpcMsg2(const SRpcMsg* pRpcMsg) {
SyncAppendEntriesBatch* pMsg = syncAppendEntriesBatchDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
ASSERT(pMsg != NULL);
return pMsg;
}
// ---- message process SyncAppendEntriesReply---- // ---- message process SyncAppendEntriesReply----
SyncAppendEntriesReply* syncAppendEntriesReplyBuild(int32_t vgId) { SyncAppendEntriesReply* syncAppendEntriesReplyBuild(int32_t vgId) {
uint32_t bytes = sizeof(SyncAppendEntriesReply); uint32_t bytes = sizeof(SyncAppendEntriesReply);
......
...@@ -88,8 +88,26 @@ static bool syncNodeOnRequestVoteLogOK(SSyncNode* pSyncNode, SyncRequestVote* pM ...@@ -88,8 +88,26 @@ static bool syncNodeOnRequestVoteLogOK(SSyncNode* pSyncNode, SyncRequestVote* pM
return false; return false;
} }
int32_t syncNodeOnRequestVote(SSyncNode* ths, SyncRequestVote* pMsg) { static void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) {
char logBuf[256];
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "recv sync-request-vote from %s:%d, {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s",
host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s);
}
static void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "send sync-request-vote-reply to %s:%d {term:%" PRId64 ", grant:%d}, %s", host, port, pMsg->term,
pMsg->voteGranted, s);
}
int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
int32_t ret = 0; int32_t ret = 0;
SyncRequestVote* pMsg = pRpcMsg->pCont;
// if already drop replica, do not process // if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) { if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) {
...@@ -121,7 +139,11 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, SyncRequestVote* pMsg) { ...@@ -121,7 +139,11 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, SyncRequestVote* pMsg) {
} }
// send msg // send msg
SyncRequestVoteReply* pReply = syncRequestVoteReplyBuild(ths->vgId); SRpcMsg rpcMsg = {0};
ret = syncBuildRequestVoteReply(&rpcMsg, ths->vgId);
ASSERT(ret == 0 );
SyncRequestVoteReply* pReply = rpcMsg.pCont;
pReply->srcId = ths->myRaftId; pReply->srcId = ths->myRaftId;
pReply->destId = pMsg->srcId; pReply->destId = pMsg->srcId;
pReply->term = ths->pRaftStore->currentTerm; pReply->term = ths->pRaftStore->currentTerm;
...@@ -135,10 +157,6 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, SyncRequestVote* pMsg) { ...@@ -135,10 +157,6 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, SyncRequestVote* pMsg) {
syncLogSendRequestVoteReply(ths, pReply, ""); syncLogSendRequestVoteReply(ths, pReply, "");
} while (0); } while (0);
SRpcMsg rpcMsg;
syncRequestVoteReply2RpcMsg(pReply, &rpcMsg);
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
syncRequestVoteReplyDestroy(pReply);
return 0; return 0;
} }
\ No newline at end of file
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include "syncRequestVoteReply.h" #include "syncRequestVoteReply.h"
#include "syncMessage.h" #include "syncMessage.h"
#include "syncRaftStore.h" #include "syncRaftStore.h"
#include "syncUtil.h"
#include "syncVoteMgr.h" #include "syncVoteMgr.h"
// TLA+ Spec // TLA+ Spec
...@@ -36,8 +37,18 @@ ...@@ -36,8 +37,18 @@
// /\ Discard(m) // /\ Discard(m)
// /\ UNCHANGED <<serverVars, votedFor, leaderVars, logVars>> // /\ UNCHANGED <<serverVars, votedFor, leaderVars, logVars>>
// //
int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, SyncRequestVoteReply* pMsg) {
static void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "recv sync-request-vote-reply from %s:%d {term:%" PRId64 ", grant:%d}, %s", host, port, pMsg->term,
pMsg->voteGranted, s);
}
int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
int32_t ret = 0; int32_t ret = 0;
SyncRequestVoteReply* pMsg = pRpcMsg->pCont;
// if already drop replica, do not process // if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) { if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) {
......
//#include <gtest/gtest.h>
#include "syncTest.h" #include "syncTest.h"
#include "syncBatch.h"
void logTest() { void logTest() {
sTrace("--- sync log test: trace"); sTrace("--- sync log test: trace");
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_LIBS_SYNC_BATCH_H
#define _TD_LIBS_SYNC_BATCH_H
#ifdef __cplusplus
extern "C" {
#endif
#include "syncInt.h"
// ---------------------------------------------
typedef struct SRaftMeta {
uint64_t seqNum;
bool isWeak;
} SRaftMeta;
// block1:
// block2: SRaftMeta array
// block3: rpc msg array (with pCont pointer)
typedef struct SyncClientRequestBatch {
uint32_t bytes;
int32_t vgId;
uint32_t msgType; // TDMT_SYNC_CLIENT_REQUEST_BATCH
uint32_t dataCount;
uint32_t dataLen;
char data[]; // block2, block3
} SyncClientRequestBatch;
SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg** rpcMsgPArr, SRaftMeta* raftArr, int32_t arrSize,
int32_t vgId);
void syncClientRequestBatch2RpcMsg(const SyncClientRequestBatch* pSyncMsg, SRpcMsg* pRpcMsg);
void syncClientRequestBatchDestroy(SyncClientRequestBatch* pMsg);
void syncClientRequestBatchDestroyDeep(SyncClientRequestBatch* pMsg);
SRaftMeta* syncClientRequestBatchMetaArr(const SyncClientRequestBatch* pSyncMsg);
SRpcMsg* syncClientRequestBatchRpcMsgArr(const SyncClientRequestBatch* pSyncMsg);
SyncClientRequestBatch* syncClientRequestBatchFromRpcMsg(const SRpcMsg* pRpcMsg);
cJSON* syncClientRequestBatch2Json(const SyncClientRequestBatch* pMsg);
char* syncClientRequestBatch2Str(const SyncClientRequestBatch* pMsg);
// for debug ----------------------
void syncClientRequestBatchPrint(const SyncClientRequestBatch* pMsg);
void syncClientRequestBatchPrint2(char* s, const SyncClientRequestBatch* pMsg);
void syncClientRequestBatchLog(const SyncClientRequestBatch* pMsg);
void syncClientRequestBatchLog2(char* s, const SyncClientRequestBatch* pMsg);
typedef struct SOffsetAndContLen {
int32_t offset;
int32_t contLen;
} SOffsetAndContLen;
// data:
// block1: SOffsetAndContLen Array
// block2: entry Array
typedef struct SyncAppendEntriesBatch {
uint32_t bytes;
int32_t vgId;
uint32_t msgType;
SRaftId srcId;
SRaftId destId;
// private data
SyncTerm term;
SyncIndex prevLogIndex;
SyncTerm prevLogTerm;
SyncIndex commitIndex;
SyncTerm privateTerm;
int32_t dataCount;
uint32_t dataLen;
char data[]; // block1, block2
} SyncAppendEntriesBatch;
SyncAppendEntriesBatch* syncAppendEntriesBatchBuild(SSyncRaftEntry** entryPArr, int32_t arrSize, int32_t vgId);
SOffsetAndContLen* syncAppendEntriesBatchMetaTableArray(SyncAppendEntriesBatch* pMsg);
void syncAppendEntriesBatchDestroy(SyncAppendEntriesBatch* pMsg);
void syncAppendEntriesBatchSerialize(const SyncAppendEntriesBatch* pMsg, char* buf, uint32_t bufLen);
void syncAppendEntriesBatchDeserialize(const char* buf, uint32_t len, SyncAppendEntriesBatch* pMsg);
char* syncAppendEntriesBatchSerialize2(const SyncAppendEntriesBatch* pMsg, uint32_t* len);
SyncAppendEntriesBatch* syncAppendEntriesBatchDeserialize2(const char* buf, uint32_t len);
void syncAppendEntriesBatch2RpcMsg(const SyncAppendEntriesBatch* pMsg, SRpcMsg* pRpcMsg);
void syncAppendEntriesBatchFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntriesBatch* pMsg);
SyncAppendEntriesBatch* syncAppendEntriesBatchFromRpcMsg2(const SRpcMsg* pRpcMsg);
// ---------------------------------------------
void syncLogSendAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s);
void syncLogRecvAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s);
#ifdef __cplusplus
}
#endif
#endif /*_TD_LIBS_SYNC_INT_H*/
...@@ -241,8 +241,44 @@ void syncClientRequestPrint2(char* s, const SyncClientRequest* pMs ...@@ -241,8 +241,44 @@ void syncClientRequestPrint2(char* s, const SyncClientRequest* pMs
void syncClientRequestLog(const SyncClientRequest* pMsg); void syncClientRequestLog(const SyncClientRequest* pMsg);
void syncClientRequestLog2(char* s, const SyncClientRequest* pMsg); void syncClientRequestLog2(char* s, const SyncClientRequest* pMsg);
SyncRequestVote* syncRequestVoteBuild(int32_t vgId);
void syncRequestVoteDestroy(SyncRequestVote* pMsg);
void syncRequestVoteSerialize(const SyncRequestVote* pMsg, char* buf, uint32_t bufLen);
void syncRequestVoteDeserialize(const char* buf, uint32_t len, SyncRequestVote* pMsg);
char* syncRequestVoteSerialize2(const SyncRequestVote* pMsg, uint32_t* len);
SyncRequestVote* syncRequestVoteDeserialize2(const char* buf, uint32_t len);
void syncRequestVote2RpcMsg(const SyncRequestVote* pMsg, SRpcMsg* pRpcMsg);
void syncRequestVoteFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVote* pMsg);
SyncRequestVote* syncRequestVoteFromRpcMsg2(const SRpcMsg* pRpcMsg);
cJSON* syncRequestVote2Json(const SyncRequestVote* pMsg);
char* syncRequestVote2Str(const SyncRequestVote* pMsg);
// for debug ----------------------
void syncRequestVotePrint(const SyncRequestVote* pMsg);
void syncRequestVotePrint2(char* s, const SyncRequestVote* pMsg);
void syncRequestVoteLog(const SyncRequestVote* pMsg);
void syncRequestVoteLog2(char* s, const SyncRequestVote* pMsg);
SyncRequestVoteReply* syncRequestVoteReplyBuild(int32_t vgId);
void syncRequestVoteReplyDestroy(SyncRequestVoteReply* pMsg);
void syncRequestVoteReplySerialize(const SyncRequestVoteReply* pMsg, char* buf, uint32_t bufLen);
void syncRequestVoteReplyDeserialize(const char* buf, uint32_t len, SyncRequestVoteReply* pMsg);
char* syncRequestVoteReplySerialize2(const SyncRequestVoteReply* pMsg, uint32_t* len);
SyncRequestVoteReply* syncRequestVoteReplyDeserialize2(const char* buf, uint32_t len);
void syncRequestVoteReply2RpcMsg(const SyncRequestVoteReply* pMsg, SRpcMsg* pRpcMsg);
void syncRequestVoteReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVoteReply* pMsg);
SyncRequestVoteReply* syncRequestVoteReplyFromRpcMsg2(const SRpcMsg* pRpcMsg);
cJSON* syncRequestVoteReply2Json(const SyncRequestVoteReply* pMsg);
char* syncRequestVoteReply2Str(const SyncRequestVoteReply* pMsg);
// for debug ----------------------
void syncRequestVoteReplyPrint(const SyncRequestVoteReply* pMsg);
void syncRequestVoteReplyPrint2(char* s, const SyncRequestVoteReply* pMsg);
void syncRequestVoteReplyLog(const SyncRequestVoteReply* pMsg);
void syncRequestVoteReplyLog2(char* s, const SyncRequestVoteReply* pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
#endif /*_TD_LIBS_SYNC_RAFT_ENTRY_H*/ #endif /*_TD_LIBS_SYNC_TEST_H*/
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "syncTest.h"
#include "syncBatch.h"
// ---- message process SyncClientRequestBatch----
// block1:
// block2: SRaftMeta array
// block3: rpc msg array (with pCont)
SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg** rpcMsgPArr, SRaftMeta* raftArr, int32_t arrSize,
int32_t vgId) {
ASSERT(rpcMsgPArr != NULL);
ASSERT(arrSize > 0);
int32_t dataLen = 0;
int32_t raftMetaArrayLen = sizeof(SRaftMeta) * arrSize;
int32_t rpcArrayLen = sizeof(SRpcMsg) * arrSize;
dataLen += (raftMetaArrayLen + rpcArrayLen);
uint32_t bytes = sizeof(SyncClientRequestBatch) + dataLen;
SyncClientRequestBatch* pMsg = taosMemoryMalloc(bytes);
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
pMsg->vgId = vgId;
pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST_BATCH;
pMsg->dataCount = arrSize;
pMsg->dataLen = dataLen;
SRaftMeta* raftMetaArr = (SRaftMeta*)(pMsg->data);
SRpcMsg* msgArr = (SRpcMsg*)((char*)(pMsg->data) + raftMetaArrayLen);
for (int i = 0; i < arrSize; ++i) {
// init raftMetaArr
raftMetaArr[i].isWeak = raftArr[i].isWeak;
raftMetaArr[i].seqNum = raftArr[i].seqNum;
// init msgArr
msgArr[i] = *(rpcMsgPArr[i]);
}
return pMsg;
}
void syncClientRequestBatch2RpcMsg(const SyncClientRequestBatch* pSyncMsg, SRpcMsg* pRpcMsg) {
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
pRpcMsg->msgType = pSyncMsg->msgType;
pRpcMsg->contLen = pSyncMsg->bytes;
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
memcpy(pRpcMsg->pCont, pSyncMsg, pRpcMsg->contLen);
}
void syncClientRequestBatchDestroy(SyncClientRequestBatch* pMsg) {
if (pMsg != NULL) {
taosMemoryFree(pMsg);
}
}
void syncClientRequestBatchDestroyDeep(SyncClientRequestBatch* pMsg) {
if (pMsg != NULL) {
int32_t arrSize = pMsg->dataCount;
int32_t raftMetaArrayLen = sizeof(SRaftMeta) * arrSize;
SRpcMsg* msgArr = (SRpcMsg*)((char*)(pMsg->data) + raftMetaArrayLen);
for (int i = 0; i < arrSize; ++i) {
if (msgArr[i].pCont != NULL) {
rpcFreeCont(msgArr[i].pCont);
}
}
taosMemoryFree(pMsg);
}
}
SRaftMeta* syncClientRequestBatchMetaArr(const SyncClientRequestBatch* pSyncMsg) {
SRaftMeta* raftMetaArr = (SRaftMeta*)(pSyncMsg->data);
return raftMetaArr;
}
SRpcMsg* syncClientRequestBatchRpcMsgArr(const SyncClientRequestBatch* pSyncMsg) {
int32_t arrSize = pSyncMsg->dataCount;
int32_t raftMetaArrayLen = sizeof(SRaftMeta) * arrSize;
SRpcMsg* msgArr = (SRpcMsg*)((char*)(pSyncMsg->data) + raftMetaArrayLen);
return msgArr;
}
SyncClientRequestBatch* syncClientRequestBatchFromRpcMsg(const SRpcMsg* pRpcMsg) {
SyncClientRequestBatch* pSyncMsg = taosMemoryMalloc(pRpcMsg->contLen);
ASSERT(pSyncMsg != NULL);
memcpy(pSyncMsg, pRpcMsg->pCont, pRpcMsg->contLen);
ASSERT(pRpcMsg->contLen == pSyncMsg->bytes);
return pSyncMsg;
}
cJSON* syncClientRequestBatch2Json(const SyncClientRequestBatch* pMsg) {
char u64buf[128] = {0};
cJSON* pRoot = cJSON_CreateObject();
if (pMsg != NULL) {
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen);
cJSON_AddNumberToObject(pRoot, "dataCount", pMsg->dataCount);
SRaftMeta* metaArr = syncClientRequestBatchMetaArr(pMsg);
SRpcMsg* msgArr = syncClientRequestBatchRpcMsgArr(pMsg);
cJSON* pMetaArr = cJSON_CreateArray();
cJSON_AddItemToObject(pRoot, "metaArr", pMetaArr);
for (int i = 0; i < pMsg->dataCount; ++i) {
cJSON* pMeta = cJSON_CreateObject();
cJSON_AddNumberToObject(pMeta, "seqNum", metaArr[i].seqNum);
cJSON_AddNumberToObject(pMeta, "isWeak", metaArr[i].isWeak);
cJSON_AddItemToArray(pMetaArr, pMeta);
}
cJSON* pMsgArr = cJSON_CreateArray();
cJSON_AddItemToObject(pRoot, "msgArr", pMsgArr);
for (int i = 0; i < pMsg->dataCount; ++i) {
cJSON* pRpcMsgJson = syncRpcMsg2Json(&msgArr[i]);
cJSON_AddItemToArray(pMsgArr, pRpcMsgJson);
}
char* s;
s = syncUtilPrintBin((char*)(pMsg->data), pMsg->dataLen);
cJSON_AddStringToObject(pRoot, "data", s);
taosMemoryFree(s);
s = syncUtilPrintBin2((char*)(pMsg->data), pMsg->dataLen);
cJSON_AddStringToObject(pRoot, "data2", s);
taosMemoryFree(s);
}
cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SyncClientRequestBatch", pRoot);
return pJson;
}
char* syncClientRequestBatch2Str(const SyncClientRequestBatch* pMsg) {
cJSON* pJson = syncClientRequestBatch2Json(pMsg);
char* serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
// for debug ----------------------
void syncClientRequestBatchPrint(const SyncClientRequestBatch* pMsg) {
char* serialized = syncClientRequestBatch2Str(pMsg);
printf("syncClientRequestBatchPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncClientRequestBatchPrint2(char* s, const SyncClientRequestBatch* pMsg) {
char* serialized = syncClientRequestBatch2Str(pMsg);
printf("syncClientRequestBatchPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncClientRequestBatchLog(const SyncClientRequestBatch* pMsg) {
char* serialized = syncClientRequestBatch2Str(pMsg);
sTrace("syncClientRequestBatchLog | len:%d | %s", (int32_t)strlen(serialized), serialized);
taosMemoryFree(serialized);
}
void syncClientRequestBatchLog2(char* s, const SyncClientRequestBatch* pMsg) {
if (gRaftDetailLog) {
char* serialized = syncClientRequestBatch2Str(pMsg);
sLTrace("syncClientRequestBatchLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
taosMemoryFree(serialized);
}
}
// ---- message process SyncAppendEntriesBatch----
// block1: SOffsetAndContLen
// block2: SOffsetAndContLen Array
// block3: entry Array
SyncAppendEntriesBatch* syncAppendEntriesBatchBuild(SSyncRaftEntry** entryPArr, int32_t arrSize, int32_t vgId) {
ASSERT(entryPArr != NULL);
ASSERT(arrSize >= 0);
int32_t dataLen = 0;
int32_t metaArrayLen = sizeof(SOffsetAndContLen) * arrSize; // <offset, contLen>
int32_t entryArrayLen = 0;
for (int i = 0; i < arrSize; ++i) { // SRpcMsg pCont
SSyncRaftEntry* pEntry = entryPArr[i];
entryArrayLen += pEntry->bytes;
}
dataLen += (metaArrayLen + entryArrayLen);
uint32_t bytes = sizeof(SyncAppendEntriesBatch) + dataLen;
SyncAppendEntriesBatch* pMsg = taosMemoryMalloc(bytes);
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
pMsg->vgId = vgId;
pMsg->msgType = TDMT_SYNC_APPEND_ENTRIES_BATCH;
pMsg->dataCount = arrSize;
pMsg->dataLen = dataLen;
SOffsetAndContLen* metaArr = (SOffsetAndContLen*)(pMsg->data);
char* pData = pMsg->data;
for (int i = 0; i < arrSize; ++i) {
// init meta <offset, contLen>
if (i == 0) {
metaArr[i].offset = metaArrayLen;
metaArr[i].contLen = entryPArr[i]->bytes;
} else {
metaArr[i].offset = metaArr[i - 1].offset + metaArr[i - 1].contLen;
metaArr[i].contLen = entryPArr[i]->bytes;
}
// init entry array
ASSERT(metaArr[i].contLen == entryPArr[i]->bytes);
memcpy(pData + metaArr[i].offset, entryPArr[i], metaArr[i].contLen);
}
return pMsg;
}
SOffsetAndContLen* syncAppendEntriesBatchMetaTableArray(SyncAppendEntriesBatch* pMsg) {
return (SOffsetAndContLen*)(pMsg->data);
}
void syncAppendEntriesBatchDestroy(SyncAppendEntriesBatch* pMsg) {
if (pMsg != NULL) {
taosMemoryFree(pMsg);
}
}
void syncAppendEntriesBatchSerialize(const SyncAppendEntriesBatch* pMsg, char* buf, uint32_t bufLen) {
ASSERT(pMsg->bytes <= bufLen);
memcpy(buf, pMsg, pMsg->bytes);
}
void syncAppendEntriesBatchDeserialize(const char* buf, uint32_t len, SyncAppendEntriesBatch* pMsg) {
memcpy(pMsg, buf, len);
ASSERT(len == pMsg->bytes);
ASSERT(pMsg->bytes == sizeof(SyncAppendEntriesBatch) + pMsg->dataLen);
}
char* syncAppendEntriesBatchSerialize2(const SyncAppendEntriesBatch* pMsg, uint32_t* len) {
char* buf = taosMemoryMalloc(pMsg->bytes);
ASSERT(buf != NULL);
syncAppendEntriesBatchSerialize(pMsg, buf, pMsg->bytes);
if (len != NULL) {
*len = pMsg->bytes;
}
return buf;
}
SyncAppendEntriesBatch* syncAppendEntriesBatchDeserialize2(const char* buf, uint32_t len) {
uint32_t bytes = *((uint32_t*)buf);
SyncAppendEntriesBatch* pMsg = taosMemoryMalloc(bytes);
ASSERT(pMsg != NULL);
syncAppendEntriesBatchDeserialize(buf, len, pMsg);
ASSERT(len == pMsg->bytes);
return pMsg;
}
void syncAppendEntriesBatch2RpcMsg(const SyncAppendEntriesBatch* pMsg, SRpcMsg* pRpcMsg) {
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
pRpcMsg->msgType = pMsg->msgType;
pRpcMsg->contLen = pMsg->bytes;
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
syncAppendEntriesBatchSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
}
void syncAppendEntriesBatchFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntriesBatch* pMsg) {
syncAppendEntriesBatchDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
}
SyncAppendEntriesBatch* syncAppendEntriesBatchFromRpcMsg2(const SRpcMsg* pRpcMsg) {
SyncAppendEntriesBatch* pMsg = syncAppendEntriesBatchDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
ASSERT(pMsg != NULL);
return pMsg;
}
cJSON* syncAppendEntriesBatch2Json(const SyncAppendEntriesBatch* pMsg) {
char u64buf[128] = {0};
cJSON* pRoot = cJSON_CreateObject();
if (pMsg != NULL) {
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
cJSON* pSrcId = cJSON_CreateObject();
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr);
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
{
uint64_t u64 = pMsg->srcId.addr;
cJSON* pTmp = pSrcId;
char host[128] = {0};
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pTmp, "addr_host", host);
cJSON_AddNumberToObject(pTmp, "addr_port", port);
}
cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
cJSON* pDestId = cJSON_CreateObject();
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr);
cJSON_AddStringToObject(pDestId, "addr", u64buf);
{
uint64_t u64 = pMsg->destId.addr;
cJSON* pTmp = pDestId;
char host[128] = {0};
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pTmp, "addr_host", host);
cJSON_AddNumberToObject(pTmp, "addr_port", port);
}
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
cJSON_AddItemToObject(pRoot, "destId", pDestId);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term);
cJSON_AddStringToObject(pRoot, "term", u64buf);
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->prevLogIndex);
cJSON_AddStringToObject(pRoot, "prevLogIndex", u64buf);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->prevLogTerm);
cJSON_AddStringToObject(pRoot, "prevLogTerm", u64buf);
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->commitIndex);
cJSON_AddStringToObject(pRoot, "commitIndex", u64buf);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->privateTerm);
cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
cJSON_AddNumberToObject(pRoot, "dataCount", pMsg->dataCount);
cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen);
int32_t metaArrayLen = sizeof(SOffsetAndContLen) * pMsg->dataCount; // <offset, contLen>
int32_t entryArrayLen = pMsg->dataLen - metaArrayLen;
cJSON_AddNumberToObject(pRoot, "metaArrayLen", metaArrayLen);
cJSON_AddNumberToObject(pRoot, "entryArrayLen", entryArrayLen);
SOffsetAndContLen* metaArr = (SOffsetAndContLen*)(pMsg->data);
cJSON* pMetaArr = cJSON_CreateArray();
cJSON_AddItemToObject(pRoot, "metaArr", pMetaArr);
for (int i = 0; i < pMsg->dataCount; ++i) {
cJSON* pMeta = cJSON_CreateObject();
cJSON_AddNumberToObject(pMeta, "offset", metaArr[i].offset);
cJSON_AddNumberToObject(pMeta, "contLen", metaArr[i].contLen);
cJSON_AddItemToArray(pMetaArr, pMeta);
}
cJSON* pEntryArr = cJSON_CreateArray();
cJSON_AddItemToObject(pRoot, "entryArr", pEntryArr);
for (int i = 0; i < pMsg->dataCount; ++i) {
SSyncRaftEntry* pEntry = (SSyncRaftEntry*)(pMsg->data + metaArr[i].offset);
cJSON* pEntryJson = syncEntry2Json(pEntry);
cJSON_AddItemToArray(pEntryArr, pEntryJson);
}
char* s;
s = syncUtilPrintBin((char*)(pMsg->data), pMsg->dataLen);
cJSON_AddStringToObject(pRoot, "data", s);
taosMemoryFree(s);
s = syncUtilPrintBin2((char*)(pMsg->data), pMsg->dataLen);
cJSON_AddStringToObject(pRoot, "data2", s);
taosMemoryFree(s);
}
cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SyncAppendEntriesBatch", pRoot);
return pJson;
}
char* syncAppendEntriesBatch2Str(const SyncAppendEntriesBatch* pMsg) {
cJSON* pJson = syncAppendEntriesBatch2Json(pMsg);
char* serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
// for debug ----------------------
void syncAppendEntriesBatchPrint(const SyncAppendEntriesBatch* pMsg) {
char* serialized = syncAppendEntriesBatch2Str(pMsg);
printf("syncAppendEntriesBatchPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncAppendEntriesBatchPrint2(char* s, const SyncAppendEntriesBatch* pMsg) {
char* serialized = syncAppendEntriesBatch2Str(pMsg);
printf("syncAppendEntriesBatchPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncAppendEntriesBatchLog(const SyncAppendEntriesBatch* pMsg) {
char* serialized = syncAppendEntriesBatch2Str(pMsg);
sTrace("syncAppendEntriesBatchLog | len:%d | %s", (int32_t)strlen(serialized), serialized);
taosMemoryFree(serialized);
}
void syncAppendEntriesBatchLog2(char* s, const SyncAppendEntriesBatch* pMsg) {
if (gRaftDetailLog) {
char* serialized = syncAppendEntriesBatch2Str(pMsg);
sLTrace("syncAppendEntriesBatchLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
taosMemoryFree(serialized);
}
}
void syncLogSendAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"send sync-append-entries-batch to %s:%d, {term:%" PRId64 ", pre-index:%" PRId64 ", pre-term:%" PRId64
", pterm:%" PRId64 ", cmt:%" PRId64 ", datalen:%d, count:%d}, %s",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex,
pMsg->dataLen, pMsg->dataCount, s);
}
void syncLogRecvAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"recv sync-append-entries-batch from %s:%d, {term:%" PRId64 ", pre-index:%" PRId64 ", pre-term:%" PRId64
", pterm:%" PRId64 ", cmt:%" PRId64 ", datalen:%d, count:%d}, %s",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex,
pMsg->dataLen, pMsg->dataCount, s);
}
\ No newline at end of file
...@@ -695,138 +695,6 @@ void syncRpcMsgLog2(char* s, SRpcMsg* pMsg) { ...@@ -695,138 +695,6 @@ void syncRpcMsgLog2(char* s, SRpcMsg* pMsg) {
} }
} }
cJSON* syncAppendEntriesBatch2Json(const SyncAppendEntriesBatch* pMsg) {
char u64buf[128] = {0};
cJSON* pRoot = cJSON_CreateObject();
if (pMsg != NULL) {
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
cJSON* pSrcId = cJSON_CreateObject();
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr);
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
{
uint64_t u64 = pMsg->srcId.addr;
cJSON* pTmp = pSrcId;
char host[128] = {0};
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pTmp, "addr_host", host);
cJSON_AddNumberToObject(pTmp, "addr_port", port);
}
cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
cJSON* pDestId = cJSON_CreateObject();
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr);
cJSON_AddStringToObject(pDestId, "addr", u64buf);
{
uint64_t u64 = pMsg->destId.addr;
cJSON* pTmp = pDestId;
char host[128] = {0};
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pTmp, "addr_host", host);
cJSON_AddNumberToObject(pTmp, "addr_port", port);
}
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
cJSON_AddItemToObject(pRoot, "destId", pDestId);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term);
cJSON_AddStringToObject(pRoot, "term", u64buf);
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->prevLogIndex);
cJSON_AddStringToObject(pRoot, "prevLogIndex", u64buf);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->prevLogTerm);
cJSON_AddStringToObject(pRoot, "prevLogTerm", u64buf);
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->commitIndex);
cJSON_AddStringToObject(pRoot, "commitIndex", u64buf);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->privateTerm);
cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
cJSON_AddNumberToObject(pRoot, "dataCount", pMsg->dataCount);
cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen);
int32_t metaArrayLen = sizeof(SOffsetAndContLen) * pMsg->dataCount; // <offset, contLen>
int32_t entryArrayLen = pMsg->dataLen - metaArrayLen;
cJSON_AddNumberToObject(pRoot, "metaArrayLen", metaArrayLen);
cJSON_AddNumberToObject(pRoot, "entryArrayLen", entryArrayLen);
SOffsetAndContLen* metaArr = (SOffsetAndContLen*)(pMsg->data);
cJSON* pMetaArr = cJSON_CreateArray();
cJSON_AddItemToObject(pRoot, "metaArr", pMetaArr);
for (int i = 0; i < pMsg->dataCount; ++i) {
cJSON* pMeta = cJSON_CreateObject();
cJSON_AddNumberToObject(pMeta, "offset", metaArr[i].offset);
cJSON_AddNumberToObject(pMeta, "contLen", metaArr[i].contLen);
cJSON_AddItemToArray(pMetaArr, pMeta);
}
cJSON* pEntryArr = cJSON_CreateArray();
cJSON_AddItemToObject(pRoot, "entryArr", pEntryArr);
for (int i = 0; i < pMsg->dataCount; ++i) {
SSyncRaftEntry* pEntry = (SSyncRaftEntry*)(pMsg->data + metaArr[i].offset);
cJSON* pEntryJson = syncEntry2Json(pEntry);
cJSON_AddItemToArray(pEntryArr, pEntryJson);
}
char* s;
s = syncUtilPrintBin((char*)(pMsg->data), pMsg->dataLen);
cJSON_AddStringToObject(pRoot, "data", s);
taosMemoryFree(s);
s = syncUtilPrintBin2((char*)(pMsg->data), pMsg->dataLen);
cJSON_AddStringToObject(pRoot, "data2", s);
taosMemoryFree(s);
}
cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SyncAppendEntriesBatch", pRoot);
return pJson;
}
char* syncAppendEntriesBatch2Str(const SyncAppendEntriesBatch* pMsg) {
cJSON* pJson = syncAppendEntriesBatch2Json(pMsg);
char* serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
// for debug ----------------------
void syncAppendEntriesBatchPrint(const SyncAppendEntriesBatch* pMsg) {
char* serialized = syncAppendEntriesBatch2Str(pMsg);
printf("syncAppendEntriesBatchPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncAppendEntriesBatchPrint2(char* s, const SyncAppendEntriesBatch* pMsg) {
char* serialized = syncAppendEntriesBatch2Str(pMsg);
printf("syncAppendEntriesBatchPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncAppendEntriesBatchLog(const SyncAppendEntriesBatch* pMsg) {
char* serialized = syncAppendEntriesBatch2Str(pMsg);
sTrace("syncAppendEntriesBatchLog | len:%d | %s", (int32_t)strlen(serialized), serialized);
taosMemoryFree(serialized);
}
void syncAppendEntriesBatchLog2(char* s, const SyncAppendEntriesBatch* pMsg) {
if (gRaftDetailLog) {
char* serialized = syncAppendEntriesBatch2Str(pMsg);
sLTrace("syncAppendEntriesBatchLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
taosMemoryFree(serialized);
}
}
cJSON* syncSnapshotSend2Json(const SyncSnapshotSend* pMsg) { cJSON* syncSnapshotSend2Json(const SyncSnapshotSend* pMsg) {
char u64buf[128]; char u64buf[128];
cJSON* pRoot = cJSON_CreateObject(); cJSON* pRoot = cJSON_CreateObject();
...@@ -982,86 +850,6 @@ char* syncClientRequest2Str(const SyncClientRequest* pMsg) { ...@@ -982,86 +850,6 @@ char* syncClientRequest2Str(const SyncClientRequest* pMsg) {
return serialized; return serialized;
} }
cJSON* syncClientRequestBatch2Json(const SyncClientRequestBatch* pMsg) {
char u64buf[128] = {0};
cJSON* pRoot = cJSON_CreateObject();
if (pMsg != NULL) {
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen);
cJSON_AddNumberToObject(pRoot, "dataCount", pMsg->dataCount);
SRaftMeta* metaArr = syncClientRequestBatchMetaArr(pMsg);
SRpcMsg* msgArr = syncClientRequestBatchRpcMsgArr(pMsg);
cJSON* pMetaArr = cJSON_CreateArray();
cJSON_AddItemToObject(pRoot, "metaArr", pMetaArr);
for (int i = 0; i < pMsg->dataCount; ++i) {
cJSON* pMeta = cJSON_CreateObject();
cJSON_AddNumberToObject(pMeta, "seqNum", metaArr[i].seqNum);
cJSON_AddNumberToObject(pMeta, "isWeak", metaArr[i].isWeak);
cJSON_AddItemToArray(pMetaArr, pMeta);
}
cJSON* pMsgArr = cJSON_CreateArray();
cJSON_AddItemToObject(pRoot, "msgArr", pMsgArr);
for (int i = 0; i < pMsg->dataCount; ++i) {
cJSON* pRpcMsgJson = syncRpcMsg2Json(&msgArr[i]);
cJSON_AddItemToArray(pMsgArr, pRpcMsgJson);
}
char* s;
s = syncUtilPrintBin((char*)(pMsg->data), pMsg->dataLen);
cJSON_AddStringToObject(pRoot, "data", s);
taosMemoryFree(s);
s = syncUtilPrintBin2((char*)(pMsg->data), pMsg->dataLen);
cJSON_AddStringToObject(pRoot, "data2", s);
taosMemoryFree(s);
}
cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SyncClientRequestBatch", pRoot);
return pJson;
}
char* syncClientRequestBatch2Str(const SyncClientRequestBatch* pMsg) {
cJSON* pJson = syncClientRequestBatch2Json(pMsg);
char* serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
// for debug ----------------------
void syncClientRequestBatchPrint(const SyncClientRequestBatch* pMsg) {
char* serialized = syncClientRequestBatch2Str(pMsg);
printf("syncClientRequestBatchPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncClientRequestBatchPrint2(char* s, const SyncClientRequestBatch* pMsg) {
char* serialized = syncClientRequestBatch2Str(pMsg);
printf("syncClientRequestBatchPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncClientRequestBatchLog(const SyncClientRequestBatch* pMsg) {
char* serialized = syncClientRequestBatch2Str(pMsg);
sTrace("syncClientRequestBatchLog | len:%d | %s", (int32_t)strlen(serialized), serialized);
taosMemoryFree(serialized);
}
void syncClientRequestBatchLog2(char* s, const SyncClientRequestBatch* pMsg) {
if (gRaftDetailLog) {
char* serialized = syncClientRequestBatch2Str(pMsg);
sLTrace("syncClientRequestBatchLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
taosMemoryFree(serialized);
}
}
// for debug ---------------------- // for debug ----------------------
void syncClientRequestPrint(const SyncClientRequest* pMsg) { void syncClientRequestPrint(const SyncClientRequest* pMsg) {
char* serialized = syncClientRequest2Str(pMsg); char* serialized = syncClientRequest2Str(pMsg);
...@@ -1221,3 +1009,302 @@ void syncTimeoutLog2(char* s, const SyncTimeout* pMsg) { ...@@ -1221,3 +1009,302 @@ void syncTimeoutLog2(char* s, const SyncTimeout* pMsg) {
taosMemoryFree(serialized); taosMemoryFree(serialized);
} }
} }
// ---- message process SyncRequestVote----
SyncRequestVote* syncRequestVoteBuild(int32_t vgId) {
uint32_t bytes = sizeof(SyncRequestVote);
SyncRequestVote* pMsg = taosMemoryMalloc(bytes);
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
pMsg->vgId = vgId;
pMsg->msgType = TDMT_SYNC_REQUEST_VOTE;
return pMsg;
}
void syncRequestVoteDestroy(SyncRequestVote* pMsg) {
if (pMsg != NULL) {
taosMemoryFree(pMsg);
}
}
void syncRequestVoteSerialize(const SyncRequestVote* pMsg, char* buf, uint32_t bufLen) {
ASSERT(pMsg->bytes <= bufLen);
memcpy(buf, pMsg, pMsg->bytes);
}
void syncRequestVoteDeserialize(const char* buf, uint32_t len, SyncRequestVote* pMsg) {
memcpy(pMsg, buf, len);
ASSERT(len == pMsg->bytes);
}
char* syncRequestVoteSerialize2(const SyncRequestVote* pMsg, uint32_t* len) {
char* buf = taosMemoryMalloc(pMsg->bytes);
ASSERT(buf != NULL);
syncRequestVoteSerialize(pMsg, buf, pMsg->bytes);
if (len != NULL) {
*len = pMsg->bytes;
}
return buf;
}
SyncRequestVote* syncRequestVoteDeserialize2(const char* buf, uint32_t len) {
uint32_t bytes = *((uint32_t*)buf);
SyncRequestVote* pMsg = taosMemoryMalloc(bytes);
ASSERT(pMsg != NULL);
syncRequestVoteDeserialize(buf, len, pMsg);
ASSERT(len == pMsg->bytes);
return pMsg;
}
void syncRequestVote2RpcMsg(const SyncRequestVote* pMsg, SRpcMsg* pRpcMsg) {
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
pRpcMsg->msgType = pMsg->msgType;
pRpcMsg->contLen = pMsg->bytes;
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
syncRequestVoteSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
}
void syncRequestVoteFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVote* pMsg) {
syncRequestVoteDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
}
SyncRequestVote* syncRequestVoteFromRpcMsg2(const SRpcMsg* pRpcMsg) {
SyncRequestVote* pMsg = syncRequestVoteDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
ASSERT(pMsg != NULL);
return pMsg;
}
cJSON* syncRequestVote2Json(const SyncRequestVote* pMsg) {
char u64buf[128] = {0};
cJSON* pRoot = cJSON_CreateObject();
if (pMsg != NULL) {
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
cJSON* pSrcId = cJSON_CreateObject();
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr);
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
{
uint64_t u64 = pMsg->srcId.addr;
cJSON* pTmp = pSrcId;
char host[128] = {0};
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pTmp, "addr_host", host);
cJSON_AddNumberToObject(pTmp, "addr_port", port);
}
cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
cJSON* pDestId = cJSON_CreateObject();
cJSON_AddNumberToObject(pDestId, "addr", pMsg->destId.addr);
{
uint64_t u64 = pMsg->destId.addr;
cJSON* pTmp = pDestId;
char host[128] = {0};
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pTmp, "addr_host", host);
cJSON_AddNumberToObject(pTmp, "addr_port", port);
}
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
cJSON_AddItemToObject(pRoot, "destId", pDestId);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term);
cJSON_AddStringToObject(pRoot, "term", u64buf);
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->lastLogIndex);
cJSON_AddStringToObject(pRoot, "lastLogIndex", u64buf);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->lastLogTerm);
cJSON_AddStringToObject(pRoot, "lastLogTerm", u64buf);
}
cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SyncRequestVote", pRoot);
return pJson;
}
char* syncRequestVote2Str(const SyncRequestVote* pMsg) {
cJSON* pJson = syncRequestVote2Json(pMsg);
char* serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
// for debug ----------------------
void syncRequestVotePrint(const SyncRequestVote* pMsg) {
char* serialized = syncRequestVote2Str(pMsg);
printf("syncRequestVotePrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncRequestVotePrint2(char* s, const SyncRequestVote* pMsg) {
char* serialized = syncRequestVote2Str(pMsg);
printf("syncRequestVotePrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncRequestVoteLog(const SyncRequestVote* pMsg) {
char* serialized = syncRequestVote2Str(pMsg);
sTrace("syncRequestVoteLog | len:%d | %s", (int32_t)strlen(serialized), serialized);
taosMemoryFree(serialized);
}
void syncRequestVoteLog2(char* s, const SyncRequestVote* pMsg) {
if (gRaftDetailLog) {
char* serialized = syncRequestVote2Str(pMsg);
sTrace("syncRequestVoteLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
taosMemoryFree(serialized);
}
}
// ---- message process SyncRequestVoteReply----
SyncRequestVoteReply* syncRequestVoteReplyBuild(int32_t vgId) {
uint32_t bytes = sizeof(SyncRequestVoteReply);
SyncRequestVoteReply* pMsg = taosMemoryMalloc(bytes);
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
pMsg->vgId = vgId;
pMsg->msgType = TDMT_SYNC_REQUEST_VOTE_REPLY;
return pMsg;
}
void syncRequestVoteReplyDestroy(SyncRequestVoteReply* pMsg) {
if (pMsg != NULL) {
taosMemoryFree(pMsg);
}
}
void syncRequestVoteReplySerialize(const SyncRequestVoteReply* pMsg, char* buf, uint32_t bufLen) {
ASSERT(pMsg->bytes <= bufLen);
memcpy(buf, pMsg, pMsg->bytes);
}
void syncRequestVoteReplyDeserialize(const char* buf, uint32_t len, SyncRequestVoteReply* pMsg) {
memcpy(pMsg, buf, len);
ASSERT(len == pMsg->bytes);
}
char* syncRequestVoteReplySerialize2(const SyncRequestVoteReply* pMsg, uint32_t* len) {
char* buf = taosMemoryMalloc(pMsg->bytes);
ASSERT(buf != NULL);
syncRequestVoteReplySerialize(pMsg, buf, pMsg->bytes);
if (len != NULL) {
*len = pMsg->bytes;
}
return buf;
}
SyncRequestVoteReply* syncRequestVoteReplyDeserialize2(const char* buf, uint32_t len) {
uint32_t bytes = *((uint32_t*)buf);
SyncRequestVoteReply* pMsg = taosMemoryMalloc(bytes);
ASSERT(pMsg != NULL);
syncRequestVoteReplyDeserialize(buf, len, pMsg);
ASSERT(len == pMsg->bytes);
return pMsg;
}
void syncRequestVoteReply2RpcMsg(const SyncRequestVoteReply* pMsg, SRpcMsg* pRpcMsg) {
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
pRpcMsg->msgType = pMsg->msgType;
pRpcMsg->contLen = pMsg->bytes;
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
syncRequestVoteReplySerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
}
void syncRequestVoteReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVoteReply* pMsg) {
syncRequestVoteReplyDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
}
SyncRequestVoteReply* syncRequestVoteReplyFromRpcMsg2(const SRpcMsg* pRpcMsg) {
SyncRequestVoteReply* pMsg = syncRequestVoteReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
ASSERT(pMsg != NULL);
return pMsg;
}
cJSON* syncRequestVoteReply2Json(const SyncRequestVoteReply* pMsg) {
char u64buf[128] = {0};
cJSON* pRoot = cJSON_CreateObject();
if (pMsg != NULL) {
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
cJSON* pSrcId = cJSON_CreateObject();
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr);
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
{
uint64_t u64 = pMsg->srcId.addr;
cJSON* pTmp = pSrcId;
char host[128] = {0};
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pTmp, "addr_host", host);
cJSON_AddNumberToObject(pTmp, "addr_port", port);
}
cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
cJSON* pDestId = cJSON_CreateObject();
cJSON_AddNumberToObject(pDestId, "addr", pMsg->destId.addr);
{
uint64_t u64 = pMsg->destId.addr;
cJSON* pTmp = pDestId;
char host[128] = {0};
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pTmp, "addr_host", host);
cJSON_AddNumberToObject(pTmp, "addr_port", port);
}
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
cJSON_AddItemToObject(pRoot, "destId", pDestId);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term);
cJSON_AddStringToObject(pRoot, "term", u64buf);
cJSON_AddNumberToObject(pRoot, "vote_granted", pMsg->voteGranted);
}
cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SyncRequestVoteReply", pRoot);
return pJson;
}
char* syncRequestVoteReply2Str(const SyncRequestVoteReply* pMsg) {
cJSON* pJson = syncRequestVoteReply2Json(pMsg);
char* serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
// for debug ----------------------
void syncRequestVoteReplyPrint(const SyncRequestVoteReply* pMsg) {
char* serialized = syncRequestVoteReply2Str(pMsg);
printf("syncRequestVoteReplyPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncRequestVoteReplyPrint2(char* s, const SyncRequestVoteReply* pMsg) {
char* serialized = syncRequestVoteReply2Str(pMsg);
printf("syncRequestVoteReplyPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncRequestVoteReplyLog(const SyncRequestVoteReply* pMsg) {
char* serialized = syncRequestVoteReply2Str(pMsg);
sTrace("syncRequestVoteReplyLog | len:%d | %s", (int32_t)strlen(serialized), serialized);
taosMemoryFree(serialized);
}
void syncRequestVoteReplyLog2(char* s, const SyncRequestVoteReply* pMsg) {
if (gRaftDetailLog) {
char* serialized = syncRequestVoteReply2Str(pMsg);
sTrace("syncRequestVoteReplyLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
taosMemoryFree(serialized);
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册