提交 54e05aee 编写于 作者: S Shengliang Guan

enh: adjust sync append entry reply

上级 437daf8c
...@@ -35,7 +35,7 @@ extern "C" { ...@@ -35,7 +35,7 @@ extern "C" {
// /\ Discard(m) // /\ Discard(m)
// /\ UNCHANGED <<serverVars, candidateVars, logVars, elections>> // /\ UNCHANGED <<serverVars, candidateVars, logVars, elections>>
// //
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMsg); int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -112,24 +112,6 @@ typedef struct SyncAppendEntriesReply { ...@@ -112,24 +112,6 @@ typedef struct SyncAppendEntriesReply {
int64_t startTime; int64_t startTime;
} SyncAppendEntriesReply; } SyncAppendEntriesReply;
SyncAppendEntriesReply* syncAppendEntriesReplyBuild(int32_t vgId);
void syncAppendEntriesReplyDestroy(SyncAppendEntriesReply* pMsg);
void syncAppendEntriesReplySerialize(const SyncAppendEntriesReply* pMsg, char* buf, uint32_t bufLen);
void syncAppendEntriesReplyDeserialize(const char* buf, uint32_t len, SyncAppendEntriesReply* pMsg);
char* syncAppendEntriesReplySerialize2(const SyncAppendEntriesReply* pMsg, uint32_t* len);
SyncAppendEntriesReply* syncAppendEntriesReplyDeserialize2(const char* buf, uint32_t len);
void syncAppendEntriesReply2RpcMsg(const SyncAppendEntriesReply* pMsg, SRpcMsg* pRpcMsg);
void syncAppendEntriesReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntriesReply* pMsg);
SyncAppendEntriesReply* syncAppendEntriesReplyFromRpcMsg2(const SRpcMsg* pRpcMsg);
cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg);
char* syncAppendEntriesReply2Str(const SyncAppendEntriesReply* pMsg);
// for debug ----------------------
void syncAppendEntriesReplyPrint(const SyncAppendEntriesReply* pMsg);
void syncAppendEntriesReplyPrint2(char* s, const SyncAppendEntriesReply* pMsg);
void syncAppendEntriesReplyLog(const SyncAppendEntriesReply* pMsg);
void syncAppendEntriesReplyLog2(char* s, const SyncAppendEntriesReply* pMsg);
// --------------------------------------------- // ---------------------------------------------
typedef struct SyncHeartbeat { typedef struct SyncHeartbeat {
uint32_t bytes; uint32_t bytes;
...@@ -430,7 +412,7 @@ void syncLocalCmdLog2(char* s, const SyncLocalCmd* pMsg); ...@@ -430,7 +412,7 @@ void syncLocalCmdLog2(char* s, const SyncLocalCmd* pMsg);
int32_t syncNodeOnRequestVote(SSyncNode* pNode, const SRpcMsg* pMsg); int32_t syncNodeOnRequestVote(SSyncNode* pNode, const SRpcMsg* pMsg);
int32_t syncNodeOnRequestVoteReply(SSyncNode* pNode, const SRpcMsg* pMsg); int32_t syncNodeOnRequestVoteReply(SSyncNode* pNode, const SRpcMsg* pMsg);
int32_t syncNodeOnAppendEntries(SSyncNode* pNode, const SRpcMsg* pMsg); int32_t syncNodeOnAppendEntries(SSyncNode* pNode, const SRpcMsg* pMsg);
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMsg); int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnPreSnapshot(SSyncNode* ths, SyncPreSnapshot* pMsg); int32_t syncNodeOnPreSnapshot(SSyncNode* ths, SyncPreSnapshot* pMsg);
int32_t syncNodeOnPreSnapshotReply(SSyncNode* ths, SyncPreSnapshotReply* pMsg); int32_t syncNodeOnPreSnapshotReply(SSyncNode* ths, SyncPreSnapshotReply* pMsg);
...@@ -460,6 +442,7 @@ int32_t syncClientRequestBuildFromNoopEntry(SRpcMsg* pClientRequestRpcMsg, const ...@@ -460,6 +442,7 @@ int32_t syncClientRequestBuildFromNoopEntry(SRpcMsg* pClientRequestRpcMsg, const
int32_t syncBuildRequestVote(SRpcMsg* pMsg, int32_t vgId); int32_t syncBuildRequestVote(SRpcMsg* pMsg, int32_t vgId);
int32_t syncBuildRequestVoteReply(SRpcMsg* pMsg, int32_t vgId); int32_t syncBuildRequestVoteReply(SRpcMsg* pMsg, int32_t vgId);
int32_t syncBuildAppendEntries(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId); int32_t syncBuildAppendEntries(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId);
int32_t syncBuildAppendEntriesReply(SRpcMsg* pMsg, int32_t vgId);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -138,6 +138,7 @@ void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMs ...@@ -138,6 +138,7 @@ void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMs
int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) { int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
SyncAppendEntries* pMsg = pRpcMsg->pCont; SyncAppendEntries* pMsg = pRpcMsg->pCont;
SRpcMsg rpcRsp = {0};
// if already drop replica, do not process // if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) { if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) {
...@@ -146,7 +147,13 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -146,7 +147,13 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
} }
// prepare response msg // prepare response msg
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); int32_t code = syncBuildRequestVoteReply(&rpcRsp, ths->vgId);
if (code != 0) {
syncLogRecvAppendEntries(ths, pMsg, "build rsp error");
goto _IGNORE;
}
SyncAppendEntriesReply* pReply = rpcRsp.pCont;
pReply->srcId = ths->myRaftId; pReply->srcId = ths->myRaftId;
pReply->destId = pMsg->srcId; pReply->destId = pMsg->srcId;
pReply->term = ths->pRaftStore->currentTerm; pReply->term = ths->pRaftStore->currentTerm;
...@@ -288,7 +295,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -288,7 +295,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
goto _SEND_RESPONSE; goto _SEND_RESPONSE;
_IGNORE: _IGNORE:
syncAppendEntriesReplyDestroy(pReply); rpcFreeCont(rpcRsp.pCont);
return 0; return 0;
_SEND_RESPONSE: _SEND_RESPONSE:
...@@ -296,10 +303,6 @@ _SEND_RESPONSE: ...@@ -296,10 +303,6 @@ _SEND_RESPONSE:
syncLogSendAppendEntriesReply(ths, pReply, ""); syncLogSendAppendEntriesReply(ths, pReply, "");
// send response // send response
SRpcMsg rpcMsg; syncNodeSendMsgById(&pReply->destId, ths, &rpcRsp);
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
syncAppendEntriesReplyDestroy(pReply);
return 0; return 0;
} }
\ No newline at end of file
...@@ -73,8 +73,9 @@ static void syncNodeStartSnapshotOnce(SSyncNode* ths, SyncIndex beginIndex, Sync ...@@ -73,8 +73,9 @@ static void syncNodeStartSnapshotOnce(SSyncNode* ths, SyncIndex beginIndex, Sync
#endif #endif
} }
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
int32_t ret = 0; int32_t ret = 0;
SyncAppendEntriesReply* pMsg = pRpcMsg->pCont;
// if already drop replica, do not process // if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) { if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) {
......
...@@ -152,9 +152,7 @@ int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) { ...@@ -152,9 +152,7 @@ int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
} else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) { } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
syncNodeOnAppendEntries(pSyncNode, pMsg); syncNodeOnAppendEntries(pSyncNode, pMsg);
} else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) { } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
SyncAppendEntriesReply* pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg); syncNodeOnAppendEntriesReply(pSyncNode, pMsg);
code = syncNodeOnAppendEntriesReply(pSyncNode, pSyncMsg);
syncAppendEntriesReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) { } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) {
SyncSnapshotSend* pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg); SyncSnapshotSend* pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg);
code = syncNodeOnSnapshot(pSyncNode, pSyncMsg); code = syncNodeOnSnapshot(pSyncNode, pSyncMsg);
......
...@@ -93,7 +93,7 @@ int32_t syncBuildRequestVote(SRpcMsg* pMsg, int32_t vgId) { ...@@ -93,7 +93,7 @@ int32_t syncBuildRequestVote(SRpcMsg* pMsg, int32_t vgId) {
pMsg->msgType = TDMT_SYNC_REQUEST_VOTE; pMsg->msgType = TDMT_SYNC_REQUEST_VOTE;
pMsg->contLen = bytes; pMsg->contLen = bytes;
if (pMsg->pCont == NULL) { if (pMsg->pCont == NULL) {
terrno = TDMT_SYNC_REQUEST_VOTE; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
...@@ -110,7 +110,7 @@ int32_t syncBuildRequestVoteReply(SRpcMsg* pMsg, int32_t vgId) { ...@@ -110,7 +110,7 @@ int32_t syncBuildRequestVoteReply(SRpcMsg* pMsg, int32_t vgId) {
pMsg->msgType = TDMT_SYNC_REQUEST_VOTE_REPLY; pMsg->msgType = TDMT_SYNC_REQUEST_VOTE_REPLY;
pMsg->contLen = bytes; pMsg->contLen = bytes;
if (pMsg->pCont == NULL) { if (pMsg->pCont == NULL) {
terrno = TDMT_SYNC_REQUEST_VOTE; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
...@@ -139,160 +139,21 @@ int32_t syncBuildAppendEntries(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId) { ...@@ -139,160 +139,21 @@ int32_t syncBuildAppendEntries(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId) {
return 0; return 0;
} }
// ---- message process SyncAppendEntriesReply---- int32_t syncBuildAppendEntriesReply(SRpcMsg* pMsg, int32_t vgId) {
SyncAppendEntriesReply* syncAppendEntriesReplyBuild(int32_t vgId) { int32_t bytes = sizeof(SyncRequestVoteReply);
uint32_t bytes = sizeof(SyncAppendEntriesReply); pMsg->pCont = rpcMallocCont(bytes);
SyncAppendEntriesReply* pMsg = taosMemoryMalloc(bytes);
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
pMsg->vgId = vgId;
pMsg->msgType = TDMT_SYNC_APPEND_ENTRIES_REPLY; pMsg->msgType = TDMT_SYNC_APPEND_ENTRIES_REPLY;
return pMsg; pMsg->contLen = bytes;
} if (pMsg->pCont == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
void syncAppendEntriesReplyDestroy(SyncAppendEntriesReply* pMsg) { return -1;
if (pMsg != NULL) {
taosMemoryFree(pMsg);
}
}
void syncAppendEntriesReplySerialize(const SyncAppendEntriesReply* pMsg, char* buf, uint32_t bufLen) {
ASSERT(pMsg->bytes <= bufLen);
memcpy(buf, pMsg, pMsg->bytes);
}
void syncAppendEntriesReplyDeserialize(const char* buf, uint32_t len, SyncAppendEntriesReply* pMsg) {
memcpy(pMsg, buf, len);
ASSERT(len == pMsg->bytes);
}
char* syncAppendEntriesReplySerialize2(const SyncAppendEntriesReply* pMsg, uint32_t* len) {
char* buf = taosMemoryMalloc(pMsg->bytes);
ASSERT(buf != NULL);
syncAppendEntriesReplySerialize(pMsg, buf, pMsg->bytes);
if (len != NULL) {
*len = pMsg->bytes;
}
return buf;
}
SyncAppendEntriesReply* syncAppendEntriesReplyDeserialize2(const char* buf, uint32_t len) {
uint32_t bytes = *((uint32_t*)buf);
SyncAppendEntriesReply* pMsg = taosMemoryMalloc(bytes);
ASSERT(pMsg != NULL);
syncAppendEntriesReplyDeserialize(buf, len, pMsg);
ASSERT(len == pMsg->bytes);
return pMsg;
}
void syncAppendEntriesReply2RpcMsg(const SyncAppendEntriesReply* pMsg, SRpcMsg* pRpcMsg) {
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
pRpcMsg->msgType = pMsg->msgType;
pRpcMsg->contLen = pMsg->bytes;
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
syncAppendEntriesReplySerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
}
void syncAppendEntriesReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntriesReply* pMsg) {
syncAppendEntriesReplyDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
}
SyncAppendEntriesReply* syncAppendEntriesReplyFromRpcMsg2(const SRpcMsg* pRpcMsg) {
SyncAppendEntriesReply* pMsg = syncAppendEntriesReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
ASSERT(pMsg != NULL);
return pMsg;
}
cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg) {
char u64buf[128] = {0};
cJSON* pRoot = cJSON_CreateObject();
if (pMsg != NULL) {
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
cJSON* pSrcId = cJSON_CreateObject();
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr);
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
{
uint64_t u64 = pMsg->srcId.addr;
cJSON* pTmp = pSrcId;
char host[128] = {0};
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pTmp, "addr_host", host);
cJSON_AddNumberToObject(pTmp, "addr_port", port);
}
cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
cJSON* pDestId = cJSON_CreateObject();
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr);
cJSON_AddStringToObject(pDestId, "addr", u64buf);
{
uint64_t u64 = pMsg->destId.addr;
cJSON* pTmp = pDestId;
char host[128] = {0};
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pTmp, "addr_host", host);
cJSON_AddNumberToObject(pTmp, "addr_port", port);
}
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
cJSON_AddItemToObject(pRoot, "destId", pDestId);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->privateTerm);
cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term);
cJSON_AddStringToObject(pRoot, "term", u64buf);
cJSON_AddNumberToObject(pRoot, "success", pMsg->success);
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->matchIndex);
cJSON_AddStringToObject(pRoot, "matchIndex", u64buf);
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->startTime);
cJSON_AddStringToObject(pRoot, "startTime", u64buf);
} }
cJSON* pJson = cJSON_CreateObject(); SyncAppendEntriesReply* pAppendEntriesReply = pMsg->pCont;
cJSON_AddItemToObject(pJson, "SyncAppendEntriesReply", pRoot); pAppendEntriesReply->bytes = bytes;
return pJson; pAppendEntriesReply->msgType = TDMT_SYNC_APPEND_ENTRIES_REPLY;
} pAppendEntriesReply->vgId = vgId;
return 0;
char* syncAppendEntriesReply2Str(const SyncAppendEntriesReply* pMsg) {
cJSON* pJson = syncAppendEntriesReply2Json(pMsg);
char* serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
// for debug ----------------------
void syncAppendEntriesReplyPrint(const SyncAppendEntriesReply* pMsg) {
char* serialized = syncAppendEntriesReply2Str(pMsg);
printf("syncAppendEntriesReplyPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncAppendEntriesReplyPrint2(char* s, const SyncAppendEntriesReply* pMsg) {
char* serialized = syncAppendEntriesReply2Str(pMsg);
printf("syncAppendEntriesReplyPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncAppendEntriesReplyLog(const SyncAppendEntriesReply* pMsg) {
char* serialized = syncAppendEntriesReply2Str(pMsg);
sTrace("syncAppendEntriesReplyLog | len:%d| %s", (int32_t)strlen(serialized), serialized);
taosMemoryFree(serialized);
}
void syncAppendEntriesReplyLog2(char* s, const SyncAppendEntriesReply* pMsg) {
if (gRaftDetailLog) {
char* serialized = syncAppendEntriesReply2Str(pMsg);
sTrace("syncAppendEntriesReplyLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
taosMemoryFree(serialized);
}
} }
// ---- message process SyncHeartbeat---- // ---- message process SyncHeartbeat----
......
...@@ -295,6 +295,24 @@ void syncAppendEntriesPrint2(char* s, const SyncAppendEntries* pMsg); ...@@ -295,6 +295,24 @@ void syncAppendEntriesPrint2(char* s, const SyncAppendEntries* pMsg);
void syncAppendEntriesLog(const SyncAppendEntries* pMsg); void syncAppendEntriesLog(const SyncAppendEntries* pMsg);
void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg); void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg);
SyncAppendEntriesReply* syncAppendEntriesReplyBuild(int32_t vgId);
void syncAppendEntriesReplyDestroy(SyncAppendEntriesReply* pMsg);
void syncAppendEntriesReplySerialize(const SyncAppendEntriesReply* pMsg, char* buf, uint32_t bufLen);
void syncAppendEntriesReplyDeserialize(const char* buf, uint32_t len, SyncAppendEntriesReply* pMsg);
char* syncAppendEntriesReplySerialize2(const SyncAppendEntriesReply* pMsg, uint32_t* len);
SyncAppendEntriesReply* syncAppendEntriesReplyDeserialize2(const char* buf, uint32_t len);
void syncAppendEntriesReply2RpcMsg(const SyncAppendEntriesReply* pMsg, SRpcMsg* pRpcMsg);
void syncAppendEntriesReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntriesReply* pMsg);
SyncAppendEntriesReply* syncAppendEntriesReplyFromRpcMsg2(const SRpcMsg* pRpcMsg);
cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg);
char* syncAppendEntriesReply2Str(const SyncAppendEntriesReply* pMsg);
// for debug ----------------------
void syncAppendEntriesReplyPrint(const SyncAppendEntriesReply* pMsg);
void syncAppendEntriesReplyPrint2(char* s, const SyncAppendEntriesReply* pMsg);
void syncAppendEntriesReplyLog(const SyncAppendEntriesReply* pMsg);
void syncAppendEntriesReplyLog2(char* s, const SyncAppendEntriesReply* pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -1479,3 +1479,159 @@ void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg) { ...@@ -1479,3 +1479,159 @@ void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg) {
taosMemoryFree(serialized); taosMemoryFree(serialized);
} }
} }
// ---- message process SyncAppendEntriesReply----
SyncAppendEntriesReply* syncAppendEntriesReplyBuild(int32_t vgId) {
uint32_t bytes = sizeof(SyncAppendEntriesReply);
SyncAppendEntriesReply* pMsg = taosMemoryMalloc(bytes);
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
pMsg->vgId = vgId;
pMsg->msgType = TDMT_SYNC_APPEND_ENTRIES_REPLY;
return pMsg;
}
void syncAppendEntriesReplyDestroy(SyncAppendEntriesReply* pMsg) {
if (pMsg != NULL) {
taosMemoryFree(pMsg);
}
}
void syncAppendEntriesReplySerialize(const SyncAppendEntriesReply* pMsg, char* buf, uint32_t bufLen) {
ASSERT(pMsg->bytes <= bufLen);
memcpy(buf, pMsg, pMsg->bytes);
}
void syncAppendEntriesReplyDeserialize(const char* buf, uint32_t len, SyncAppendEntriesReply* pMsg) {
memcpy(pMsg, buf, len);
ASSERT(len == pMsg->bytes);
}
char* syncAppendEntriesReplySerialize2(const SyncAppendEntriesReply* pMsg, uint32_t* len) {
char* buf = taosMemoryMalloc(pMsg->bytes);
ASSERT(buf != NULL);
syncAppendEntriesReplySerialize(pMsg, buf, pMsg->bytes);
if (len != NULL) {
*len = pMsg->bytes;
}
return buf;
}
SyncAppendEntriesReply* syncAppendEntriesReplyDeserialize2(const char* buf, uint32_t len) {
uint32_t bytes = *((uint32_t*)buf);
SyncAppendEntriesReply* pMsg = taosMemoryMalloc(bytes);
ASSERT(pMsg != NULL);
syncAppendEntriesReplyDeserialize(buf, len, pMsg);
ASSERT(len == pMsg->bytes);
return pMsg;
}
void syncAppendEntriesReply2RpcMsg(const SyncAppendEntriesReply* pMsg, SRpcMsg* pRpcMsg) {
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
pRpcMsg->msgType = pMsg->msgType;
pRpcMsg->contLen = pMsg->bytes;
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
syncAppendEntriesReplySerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
}
void syncAppendEntriesReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntriesReply* pMsg) {
syncAppendEntriesReplyDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
}
SyncAppendEntriesReply* syncAppendEntriesReplyFromRpcMsg2(const SRpcMsg* pRpcMsg) {
SyncAppendEntriesReply* pMsg = syncAppendEntriesReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
ASSERT(pMsg != NULL);
return pMsg;
}
cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg) {
char u64buf[128] = {0};
cJSON* pRoot = cJSON_CreateObject();
if (pMsg != NULL) {
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
cJSON* pSrcId = cJSON_CreateObject();
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr);
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
{
uint64_t u64 = pMsg->srcId.addr;
cJSON* pTmp = pSrcId;
char host[128] = {0};
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pTmp, "addr_host", host);
cJSON_AddNumberToObject(pTmp, "addr_port", port);
}
cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
cJSON* pDestId = cJSON_CreateObject();
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr);
cJSON_AddStringToObject(pDestId, "addr", u64buf);
{
uint64_t u64 = pMsg->destId.addr;
cJSON* pTmp = pDestId;
char host[128] = {0};
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pTmp, "addr_host", host);
cJSON_AddNumberToObject(pTmp, "addr_port", port);
}
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
cJSON_AddItemToObject(pRoot, "destId", pDestId);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->privateTerm);
cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term);
cJSON_AddStringToObject(pRoot, "term", u64buf);
cJSON_AddNumberToObject(pRoot, "success", pMsg->success);
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->matchIndex);
cJSON_AddStringToObject(pRoot, "matchIndex", u64buf);
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->startTime);
cJSON_AddStringToObject(pRoot, "startTime", u64buf);
}
cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SyncAppendEntriesReply", pRoot);
return pJson;
}
char* syncAppendEntriesReply2Str(const SyncAppendEntriesReply* pMsg) {
cJSON* pJson = syncAppendEntriesReply2Json(pMsg);
char* serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
// for debug ----------------------
void syncAppendEntriesReplyPrint(const SyncAppendEntriesReply* pMsg) {
char* serialized = syncAppendEntriesReply2Str(pMsg);
printf("syncAppendEntriesReplyPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncAppendEntriesReplyPrint2(char* s, const SyncAppendEntriesReply* pMsg) {
char* serialized = syncAppendEntriesReply2Str(pMsg);
printf("syncAppendEntriesReplyPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncAppendEntriesReplyLog(const SyncAppendEntriesReply* pMsg) {
char* serialized = syncAppendEntriesReply2Str(pMsg);
sTrace("syncAppendEntriesReplyLog | len:%d| %s", (int32_t)strlen(serialized), serialized);
taosMemoryFree(serialized);
}
void syncAppendEntriesReplyLog2(char* s, const SyncAppendEntriesReply* pMsg) {
if (gRaftDetailLog) {
char* serialized = syncAppendEntriesReply2Str(pMsg);
sTrace("syncAppendEntriesReplyLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
taosMemoryFree(serialized);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册