提交 cd19ff01 编写于 作者: S Shengliang Guan

enh: adjust request vote reply

上级 8cca68f2
...@@ -324,11 +324,6 @@ int32_t syncNodePeerStateInit(SSyncNode* pSyncNode); ...@@ -324,11 +324,6 @@ int32_t syncNodePeerStateInit(SSyncNode* pSyncNode);
void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s); void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s);
void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s); void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s);
void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s);
void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s);
void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s);
void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s); void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s);
void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s); void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s);
......
...@@ -80,27 +80,6 @@ typedef struct SyncRequestVoteReply { ...@@ -80,27 +80,6 @@ typedef struct SyncRequestVoteReply {
bool voteGranted; bool voteGranted;
} SyncRequestVoteReply; } SyncRequestVoteReply;
SyncRequestVoteReply* syncRequestVoteReplyBuild(int32_t vgId);
void syncRequestVoteReplyDestroy(SyncRequestVoteReply* pMsg);
void syncRequestVoteReplySerialize(const SyncRequestVoteReply* pMsg, char* buf, uint32_t bufLen);
void syncRequestVoteReplyDeserialize(const char* buf, uint32_t len, SyncRequestVoteReply* pMsg);
char* syncRequestVoteReplySerialize2(const SyncRequestVoteReply* pMsg, uint32_t* len);
SyncRequestVoteReply* syncRequestVoteReplyDeserialize2(const char* buf, uint32_t len);
void syncRequestVoteReply2RpcMsg(const SyncRequestVoteReply* pMsg, SRpcMsg* pRpcMsg);
void syncRequestVoteReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVoteReply* pMsg);
SyncRequestVoteReply* syncRequestVoteReplyFromRpcMsg2(const SRpcMsg* pRpcMsg);
cJSON* syncRequestVoteReply2Json(const SyncRequestVoteReply* pMsg);
char* syncRequestVoteReply2Str(const SyncRequestVoteReply* pMsg);
// for debug ----------------------
void syncRequestVoteReplyPrint(const SyncRequestVoteReply* pMsg);
void syncRequestVoteReplyPrint2(char* s, const SyncRequestVoteReply* pMsg);
void syncRequestVoteReplyLog(const SyncRequestVoteReply* pMsg);
void syncRequestVoteReplyLog2(char* s, const SyncRequestVoteReply* pMsg);
// ---------------------------------------------
// data: entry
typedef struct SyncAppendEntries { typedef struct SyncAppendEntries {
uint32_t bytes; uint32_t bytes;
int32_t vgId; int32_t vgId;
...@@ -478,7 +457,7 @@ void syncLocalCmdLog2(char* s, const SyncLocalCmd* pMsg); ...@@ -478,7 +457,7 @@ void syncLocalCmdLog2(char* s, const SyncLocalCmd* pMsg);
// on message ---------------------- // on message ----------------------
int32_t syncNodeOnRequestVote(SSyncNode* pNode, const SRpcMsg* pMsg); int32_t syncNodeOnRequestVote(SSyncNode* pNode, const SRpcMsg* pMsg);
int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, SyncRequestVoteReply* pMsg); int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg); int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg);
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMsg); int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
...@@ -509,6 +488,7 @@ int32_t syncClientRequestBuildFromRpcMsg(SRpcMsg* pClientRequestRpcMsg, const SR ...@@ -509,6 +488,7 @@ int32_t syncClientRequestBuildFromRpcMsg(SRpcMsg* pClientRequestRpcMsg, const SR
bool isWeak, int32_t vgId); bool isWeak, int32_t vgId);
int32_t syncClientRequestBuildFromNoopEntry(SRpcMsg* pClientRequestRpcMsg, const SSyncRaftEntry* pEntry, int32_t vgId); int32_t syncClientRequestBuildFromNoopEntry(SRpcMsg* pClientRequestRpcMsg, const SSyncRaftEntry* pEntry, int32_t vgId);
int32_t syncBuildRequestVote(SRpcMsg* pMsg, int32_t vgId); int32_t syncBuildRequestVote(SRpcMsg* pMsg, int32_t vgId);
int32_t syncBuildRequestVoteReply(SRpcMsg* pMsg, int32_t vgId);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -39,7 +39,7 @@ extern "C" { ...@@ -39,7 +39,7 @@ extern "C" {
// /\ Discard(m) // /\ Discard(m)
// /\ UNCHANGED <<serverVars, votedFor, leaderVars, logVars>> // /\ UNCHANGED <<serverVars, votedFor, leaderVars, logVars>>
// //
int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, SyncRequestVoteReply* pMsg); int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, const SRpcMsg* pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -148,9 +148,7 @@ int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) { ...@@ -148,9 +148,7 @@ int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
syncNodeOnRequestVote(pSyncNode, pMsg); syncNodeOnRequestVote(pSyncNode, pMsg);
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) { } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
SyncRequestVoteReply* pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg); code = syncNodeOnRequestVoteReply(pSyncNode, pMsg);
code = syncNodeOnRequestVoteReply(pSyncNode, pSyncMsg);
syncRequestVoteReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) { } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
SyncAppendEntries* pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg); SyncAppendEntries* pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
code = syncNodeOnAppendEntries(pSyncNode, pSyncMsg); code = syncNodeOnAppendEntries(pSyncNode, pSyncMsg);
...@@ -376,7 +374,7 @@ bool syncIsReadyForRead(int64_t rid) { ...@@ -376,7 +374,7 @@ bool syncIsReadyForRead(int64_t rid) {
if (!pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore)) { if (!pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore)) {
SSyncRaftEntry* pEntry = NULL; SSyncRaftEntry* pEntry = NULL;
int32_t code = pSyncNode->pLogStore->syncLogGetEntry( int32_t code = pSyncNode->pLogStore->syncLogGetEntry(
pSyncNode->pLogStore, pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore), &pEntry); pSyncNode->pLogStore, pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore), &pEntry);
if (code == 0 && pEntry != NULL) { if (code == 0 && pEntry != NULL) {
if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == pSyncNode->pRaftStore->currentTerm) { if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == pSyncNode->pRaftStore->currentTerm) {
ready = true; ready = true;
...@@ -1652,9 +1650,13 @@ void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) ...@@ -1652,9 +1650,13 @@ void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId)
// simulate get vote from outside // simulate get vote from outside
void syncNodeVoteForSelf(SSyncNode* pSyncNode) { void syncNodeVoteForSelf(SSyncNode* pSyncNode) {
syncNodeVoteForTerm(pSyncNode, pSyncNode->pRaftStore->currentTerm, &(pSyncNode->myRaftId)); syncNodeVoteForTerm(pSyncNode, pSyncNode->pRaftStore->currentTerm, &pSyncNode->myRaftId);
SyncRequestVoteReply* pMsg = syncRequestVoteReplyBuild(pSyncNode->vgId); SRpcMsg rpcMsg = {0};
int32_t ret = syncBuildRequestVoteReply(&rpcMsg, pSyncNode->vgId);
if (ret == 0) return;
SyncRequestVoteReply* pMsg = rpcMsg.pCont;
pMsg->srcId = pSyncNode->myRaftId; pMsg->srcId = pSyncNode->myRaftId;
pMsg->destId = pSyncNode->myRaftId; pMsg->destId = pSyncNode->myRaftId;
pMsg->term = pSyncNode->pRaftStore->currentTerm; pMsg->term = pSyncNode->pRaftStore->currentTerm;
...@@ -1662,11 +1664,9 @@ void syncNodeVoteForSelf(SSyncNode* pSyncNode) { ...@@ -1662,11 +1664,9 @@ void syncNodeVoteForSelf(SSyncNode* pSyncNode) {
voteGrantedVote(pSyncNode->pVotesGranted, pMsg); voteGrantedVote(pSyncNode->pVotesGranted, pMsg);
votesRespondAdd(pSyncNode->pVotesRespond, pMsg); votesRespondAdd(pSyncNode->pVotesRespond, pMsg);
syncRequestVoteReplyDestroy(pMsg); rpcFreeCont(rpcMsg.pCont);
} }
// snapshot --------------
// return if has a snapshot // return if has a snapshot
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) { bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
bool ret = false; bool ret = false;
...@@ -2533,31 +2533,6 @@ void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* ...@@ -2533,31 +2533,6 @@ void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char*
syncTimerTypeStr(pMsg->timeoutType), pMsg->logicClock, pMsg->timerMS, pMsg->data, s); syncTimerTypeStr(pMsg->timeoutType), pMsg->logicClock, pMsg->timerMS, pMsg->data, s);
} }
void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) {
char logBuf[256];
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "recv sync-request-vote from %s:%d, {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s",
host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s);
}
void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "send sync-request-vote-reply to %s:%d {term:%" PRId64 ", grant:%d}, %s", host, port, pMsg->term,
pMsg->voteGranted, s);
}
void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "recv sync-request-vote-reply from %s:%d {term:%" PRId64 ", grant:%d}, %s", host, port, pMsg->term,
pMsg->voteGranted, s);
}
void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) { void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
char host[64]; char host[64];
uint16_t port; uint16_t port;
......
...@@ -105,152 +105,21 @@ int32_t syncBuildRequestVote(SRpcMsg* pMsg, int32_t vgId) { ...@@ -105,152 +105,21 @@ int32_t syncBuildRequestVote(SRpcMsg* pMsg, int32_t vgId) {
return 0; return 0;
} }
// ---- message process SyncRequestVoteReply---- int32_t syncBuildRequestVoteReply(SRpcMsg* pMsg, int32_t vgId) {
SyncRequestVoteReply* syncRequestVoteReplyBuild(int32_t vgId) { int32_t bytes = sizeof(SyncRequestVoteReply);
uint32_t bytes = sizeof(SyncRequestVoteReply); pMsg->pCont = rpcMallocCont(bytes);
SyncRequestVoteReply* pMsg = taosMemoryMalloc(bytes);
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
pMsg->vgId = vgId;
pMsg->msgType = TDMT_SYNC_REQUEST_VOTE_REPLY; pMsg->msgType = TDMT_SYNC_REQUEST_VOTE_REPLY;
return pMsg; pMsg->contLen = bytes;
} if (pMsg->pCont == NULL) {
terrno = TDMT_SYNC_REQUEST_VOTE;
void syncRequestVoteReplyDestroy(SyncRequestVoteReply* pMsg) { return -1;
if (pMsg != NULL) {
taosMemoryFree(pMsg);
}
}
void syncRequestVoteReplySerialize(const SyncRequestVoteReply* pMsg, char* buf, uint32_t bufLen) {
ASSERT(pMsg->bytes <= bufLen);
memcpy(buf, pMsg, pMsg->bytes);
}
void syncRequestVoteReplyDeserialize(const char* buf, uint32_t len, SyncRequestVoteReply* pMsg) {
memcpy(pMsg, buf, len);
ASSERT(len == pMsg->bytes);
}
char* syncRequestVoteReplySerialize2(const SyncRequestVoteReply* pMsg, uint32_t* len) {
char* buf = taosMemoryMalloc(pMsg->bytes);
ASSERT(buf != NULL);
syncRequestVoteReplySerialize(pMsg, buf, pMsg->bytes);
if (len != NULL) {
*len = pMsg->bytes;
}
return buf;
}
SyncRequestVoteReply* syncRequestVoteReplyDeserialize2(const char* buf, uint32_t len) {
uint32_t bytes = *((uint32_t*)buf);
SyncRequestVoteReply* pMsg = taosMemoryMalloc(bytes);
ASSERT(pMsg != NULL);
syncRequestVoteReplyDeserialize(buf, len, pMsg);
ASSERT(len == pMsg->bytes);
return pMsg;
}
void syncRequestVoteReply2RpcMsg(const SyncRequestVoteReply* pMsg, SRpcMsg* pRpcMsg) {
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
pRpcMsg->msgType = pMsg->msgType;
pRpcMsg->contLen = pMsg->bytes;
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
syncRequestVoteReplySerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
}
void syncRequestVoteReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVoteReply* pMsg) {
syncRequestVoteReplyDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
}
SyncRequestVoteReply* syncRequestVoteReplyFromRpcMsg2(const SRpcMsg* pRpcMsg) {
SyncRequestVoteReply* pMsg = syncRequestVoteReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
ASSERT(pMsg != NULL);
return pMsg;
}
cJSON* syncRequestVoteReply2Json(const SyncRequestVoteReply* pMsg) {
char u64buf[128] = {0};
cJSON* pRoot = cJSON_CreateObject();
if (pMsg != NULL) {
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
cJSON* pSrcId = cJSON_CreateObject();
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr);
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
{
uint64_t u64 = pMsg->srcId.addr;
cJSON* pTmp = pSrcId;
char host[128] = {0};
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pTmp, "addr_host", host);
cJSON_AddNumberToObject(pTmp, "addr_port", port);
}
cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
cJSON* pDestId = cJSON_CreateObject();
cJSON_AddNumberToObject(pDestId, "addr", pMsg->destId.addr);
{
uint64_t u64 = pMsg->destId.addr;
cJSON* pTmp = pDestId;
char host[128] = {0};
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pTmp, "addr_host", host);
cJSON_AddNumberToObject(pTmp, "addr_port", port);
}
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
cJSON_AddItemToObject(pRoot, "destId", pDestId);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term);
cJSON_AddStringToObject(pRoot, "term", u64buf);
cJSON_AddNumberToObject(pRoot, "vote_granted", pMsg->voteGranted);
} }
cJSON* pJson = cJSON_CreateObject(); SyncRequestVoteReply* pRequestVoteReply = pMsg->pCont;
cJSON_AddItemToObject(pJson, "SyncRequestVoteReply", pRoot); pRequestVoteReply->bytes = bytes;
return pJson; pRequestVoteReply->msgType = TDMT_SYNC_REQUEST_VOTE_REPLY;
} pRequestVoteReply->vgId = vgId;
return 0;
char* syncRequestVoteReply2Str(const SyncRequestVoteReply* pMsg) {
cJSON* pJson = syncRequestVoteReply2Json(pMsg);
char* serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
// for debug ----------------------
void syncRequestVoteReplyPrint(const SyncRequestVoteReply* pMsg) {
char* serialized = syncRequestVoteReply2Str(pMsg);
printf("syncRequestVoteReplyPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncRequestVoteReplyPrint2(char* s, const SyncRequestVoteReply* pMsg) {
char* serialized = syncRequestVoteReply2Str(pMsg);
printf("syncRequestVoteReplyPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncRequestVoteReplyLog(const SyncRequestVoteReply* pMsg) {
char* serialized = syncRequestVoteReply2Str(pMsg);
sTrace("syncRequestVoteReplyLog | len:%d | %s", (int32_t)strlen(serialized), serialized);
taosMemoryFree(serialized);
}
void syncRequestVoteReplyLog2(char* s, const SyncRequestVoteReply* pMsg) {
if (gRaftDetailLog) {
char* serialized = syncRequestVoteReply2Str(pMsg);
sTrace("syncRequestVoteReplyLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
taosMemoryFree(serialized);
}
} }
// ---- message process SyncAppendEntries---- // ---- message process SyncAppendEntries----
......
...@@ -88,6 +88,23 @@ static bool syncNodeOnRequestVoteLogOK(SSyncNode* pSyncNode, SyncRequestVote* pM ...@@ -88,6 +88,23 @@ static bool syncNodeOnRequestVoteLogOK(SSyncNode* pSyncNode, SyncRequestVote* pM
return false; return false;
} }
static void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) {
char logBuf[256];
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "recv sync-request-vote from %s:%d, {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s",
host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s);
}
static void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "send sync-request-vote-reply to %s:%d {term:%" PRId64 ", grant:%d}, %s", host, port, pMsg->term,
pMsg->voteGranted, s);
}
int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
int32_t ret = 0; int32_t ret = 0;
SyncRequestVote* pMsg = pRpcMsg->pCont; SyncRequestVote* pMsg = pRpcMsg->pCont;
...@@ -122,7 +139,11 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -122,7 +139,11 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
} }
// send msg // send msg
SyncRequestVoteReply* pReply = syncRequestVoteReplyBuild(ths->vgId); SRpcMsg rpcMsg = {0};
ret = syncBuildRequestVoteReply(&rpcMsg, ths->vgId);
ASSERT(ret);
SyncRequestVoteReply* pReply = rpcMsg.pCont;
pReply->srcId = ths->myRaftId; pReply->srcId = ths->myRaftId;
pReply->destId = pMsg->srcId; pReply->destId = pMsg->srcId;
pReply->term = ths->pRaftStore->currentTerm; pReply->term = ths->pRaftStore->currentTerm;
...@@ -136,10 +157,6 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -136,10 +157,6 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
syncLogSendRequestVoteReply(ths, pReply, ""); syncLogSendRequestVoteReply(ths, pReply, "");
} while (0); } while (0);
SRpcMsg rpcMsg;
syncRequestVoteReply2RpcMsg(pReply, &rpcMsg);
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
syncRequestVoteReplyDestroy(pReply);
return 0; return 0;
} }
\ No newline at end of file
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include "syncRequestVoteReply.h" #include "syncRequestVoteReply.h"
#include "syncMessage.h" #include "syncMessage.h"
#include "syncRaftStore.h" #include "syncRaftStore.h"
#include "syncUtil.h"
#include "syncVoteMgr.h" #include "syncVoteMgr.h"
// TLA+ Spec // TLA+ Spec
...@@ -36,8 +37,18 @@ ...@@ -36,8 +37,18 @@
// /\ Discard(m) // /\ Discard(m)
// /\ UNCHANGED <<serverVars, votedFor, leaderVars, logVars>> // /\ UNCHANGED <<serverVars, votedFor, leaderVars, logVars>>
// //
int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, SyncRequestVoteReply* pMsg) {
int32_t ret = 0; static void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "recv sync-request-vote-reply from %s:%d {term:%" PRId64 ", grant:%d}, %s", host, port, pMsg->term,
pMsg->voteGranted, s);
}
int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
int32_t ret = 0;
SyncRequestVoteReply* pMsg = pRpcMsg->pCont;
// if already drop replica, do not process // if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) { if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) {
......
...@@ -259,6 +259,24 @@ void syncRequestVotePrint2(char* s, const SyncRequestVote* pMsg); ...@@ -259,6 +259,24 @@ void syncRequestVotePrint2(char* s, const SyncRequestVote* pMsg);
void syncRequestVoteLog(const SyncRequestVote* pMsg); void syncRequestVoteLog(const SyncRequestVote* pMsg);
void syncRequestVoteLog2(char* s, const SyncRequestVote* pMsg); void syncRequestVoteLog2(char* s, const SyncRequestVote* pMsg);
SyncRequestVoteReply* syncRequestVoteReplyBuild(int32_t vgId);
void syncRequestVoteReplyDestroy(SyncRequestVoteReply* pMsg);
void syncRequestVoteReplySerialize(const SyncRequestVoteReply* pMsg, char* buf, uint32_t bufLen);
void syncRequestVoteReplyDeserialize(const char* buf, uint32_t len, SyncRequestVoteReply* pMsg);
char* syncRequestVoteReplySerialize2(const SyncRequestVoteReply* pMsg, uint32_t* len);
SyncRequestVoteReply* syncRequestVoteReplyDeserialize2(const char* buf, uint32_t len);
void syncRequestVoteReply2RpcMsg(const SyncRequestVoteReply* pMsg, SRpcMsg* pRpcMsg);
void syncRequestVoteReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVoteReply* pMsg);
SyncRequestVoteReply* syncRequestVoteReplyFromRpcMsg2(const SRpcMsg* pRpcMsg);
cJSON* syncRequestVoteReply2Json(const SyncRequestVoteReply* pMsg);
char* syncRequestVoteReply2Str(const SyncRequestVoteReply* pMsg);
// for debug ----------------------
void syncRequestVoteReplyPrint(const SyncRequestVoteReply* pMsg);
void syncRequestVoteReplyPrint2(char* s, const SyncRequestVoteReply* pMsg);
void syncRequestVoteReplyLog(const SyncRequestVoteReply* pMsg);
void syncRequestVoteReplyLog2(char* s, const SyncRequestVoteReply* pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -1159,4 +1159,152 @@ void syncRequestVoteLog2(char* s, const SyncRequestVote* pMsg) { ...@@ -1159,4 +1159,152 @@ void syncRequestVoteLog2(char* s, const SyncRequestVote* pMsg) {
sTrace("syncRequestVoteLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized); sTrace("syncRequestVoteLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
taosMemoryFree(serialized); taosMemoryFree(serialized);
} }
}
// ---- message process SyncRequestVoteReply----
SyncRequestVoteReply* syncRequestVoteReplyBuild(int32_t vgId) {
uint32_t bytes = sizeof(SyncRequestVoteReply);
SyncRequestVoteReply* pMsg = taosMemoryMalloc(bytes);
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
pMsg->vgId = vgId;
pMsg->msgType = TDMT_SYNC_REQUEST_VOTE_REPLY;
return pMsg;
}
void syncRequestVoteReplyDestroy(SyncRequestVoteReply* pMsg) {
if (pMsg != NULL) {
taosMemoryFree(pMsg);
}
}
void syncRequestVoteReplySerialize(const SyncRequestVoteReply* pMsg, char* buf, uint32_t bufLen) {
ASSERT(pMsg->bytes <= bufLen);
memcpy(buf, pMsg, pMsg->bytes);
}
void syncRequestVoteReplyDeserialize(const char* buf, uint32_t len, SyncRequestVoteReply* pMsg) {
memcpy(pMsg, buf, len);
ASSERT(len == pMsg->bytes);
}
char* syncRequestVoteReplySerialize2(const SyncRequestVoteReply* pMsg, uint32_t* len) {
char* buf = taosMemoryMalloc(pMsg->bytes);
ASSERT(buf != NULL);
syncRequestVoteReplySerialize(pMsg, buf, pMsg->bytes);
if (len != NULL) {
*len = pMsg->bytes;
}
return buf;
}
SyncRequestVoteReply* syncRequestVoteReplyDeserialize2(const char* buf, uint32_t len) {
uint32_t bytes = *((uint32_t*)buf);
SyncRequestVoteReply* pMsg = taosMemoryMalloc(bytes);
ASSERT(pMsg != NULL);
syncRequestVoteReplyDeserialize(buf, len, pMsg);
ASSERT(len == pMsg->bytes);
return pMsg;
}
void syncRequestVoteReply2RpcMsg(const SyncRequestVoteReply* pMsg, SRpcMsg* pRpcMsg) {
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
pRpcMsg->msgType = pMsg->msgType;
pRpcMsg->contLen = pMsg->bytes;
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
syncRequestVoteReplySerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
}
void syncRequestVoteReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVoteReply* pMsg) {
syncRequestVoteReplyDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
}
SyncRequestVoteReply* syncRequestVoteReplyFromRpcMsg2(const SRpcMsg* pRpcMsg) {
SyncRequestVoteReply* pMsg = syncRequestVoteReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
ASSERT(pMsg != NULL);
return pMsg;
}
cJSON* syncRequestVoteReply2Json(const SyncRequestVoteReply* pMsg) {
char u64buf[128] = {0};
cJSON* pRoot = cJSON_CreateObject();
if (pMsg != NULL) {
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
cJSON* pSrcId = cJSON_CreateObject();
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr);
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
{
uint64_t u64 = pMsg->srcId.addr;
cJSON* pTmp = pSrcId;
char host[128] = {0};
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pTmp, "addr_host", host);
cJSON_AddNumberToObject(pTmp, "addr_port", port);
}
cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
cJSON* pDestId = cJSON_CreateObject();
cJSON_AddNumberToObject(pDestId, "addr", pMsg->destId.addr);
{
uint64_t u64 = pMsg->destId.addr;
cJSON* pTmp = pDestId;
char host[128] = {0};
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pTmp, "addr_host", host);
cJSON_AddNumberToObject(pTmp, "addr_port", port);
}
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
cJSON_AddItemToObject(pRoot, "destId", pDestId);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term);
cJSON_AddStringToObject(pRoot, "term", u64buf);
cJSON_AddNumberToObject(pRoot, "vote_granted", pMsg->voteGranted);
}
cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SyncRequestVoteReply", pRoot);
return pJson;
}
char* syncRequestVoteReply2Str(const SyncRequestVoteReply* pMsg) {
cJSON* pJson = syncRequestVoteReply2Json(pMsg);
char* serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
// for debug ----------------------
void syncRequestVoteReplyPrint(const SyncRequestVoteReply* pMsg) {
char* serialized = syncRequestVoteReply2Str(pMsg);
printf("syncRequestVoteReplyPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncRequestVoteReplyPrint2(char* s, const SyncRequestVoteReply* pMsg) {
char* serialized = syncRequestVoteReply2Str(pMsg);
printf("syncRequestVoteReplyPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncRequestVoteReplyLog(const SyncRequestVoteReply* pMsg) {
char* serialized = syncRequestVoteReply2Str(pMsg);
sTrace("syncRequestVoteReplyLog | len:%d | %s", (int32_t)strlen(serialized), serialized);
taosMemoryFree(serialized);
}
void syncRequestVoteReplyLog2(char* s, const SyncRequestVoteReply* pMsg) {
if (gRaftDetailLog) {
char* serialized = syncRequestVoteReply2Str(pMsg);
sTrace("syncRequestVoteReplyLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
taosMemoryFree(serialized);
}
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册