提交 94076e59 编写于 作者: S Shengliang Guan

refact: adjust sync append entry

上级 152b1ea9
...@@ -434,11 +434,9 @@ ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode); ...@@ -434,11 +434,9 @@ ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
const char* syncTimerTypeStr(enum ESyncTimeoutType timerType); const char* syncTimerTypeStr(enum ESyncTimeoutType timerType);
int32_t syncTimeoutBuild(SRpcMsg* pTimeoutRpcMsg, ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS, int32_t syncBuildTimeout(SRpcMsg* pMsg, ESyncTimeoutType ttype, uint64_t logicClock, int32_t ms, SSyncNode* pNode);
SSyncNode* pNode); int32_t syncBuildClientRequest(SRpcMsg* pMsg, const SRpcMsg* pOriginalRpc, uint64_t seq, bool isWeak, int32_t vgId);
int32_t syncClientRequestBuildFromRpcMsg(SRpcMsg* pClientRequestRpcMsg, const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, int32_t syncBuildClientRequestFromNoopEntry(SRpcMsg* pMsg, const SSyncRaftEntry* pEntry, int32_t vgId);
bool isWeak, int32_t vgId);
int32_t syncClientRequestBuildFromNoopEntry(SRpcMsg* pClientRequestRpcMsg, const SSyncRaftEntry* pEntry, int32_t vgId);
int32_t syncBuildRequestVote(SRpcMsg* pMsg, int32_t vgId); int32_t syncBuildRequestVote(SRpcMsg* pMsg, int32_t vgId);
int32_t syncBuildRequestVoteReply(SRpcMsg* pMsg, int32_t vgId); int32_t syncBuildRequestVoteReply(SRpcMsg* pMsg, int32_t vgId);
int32_t syncBuildAppendEntries(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId); int32_t syncBuildAppendEntries(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId);
......
...@@ -601,7 +601,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { ...@@ -601,7 +601,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg}; SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
uint64_t seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub); uint64_t seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
int32_t code = syncClientRequestBuildFromRpcMsg(&rpcMsg, pMsg, seqNum, isWeak, pSyncNode->vgId); int32_t code = syncBuildClientRequest(&rpcMsg, pMsg, seqNum, isWeak, pSyncNode->vgId);
if (code != 0) { if (code != 0) {
sError("vgId:%d, failed to propose msg while serialize since %s", pSyncNode->vgId, terrstr()); sError("vgId:%d, failed to propose msg while serialize since %s", pSyncNode->vgId, terrstr());
(void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum); (void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
...@@ -1794,7 +1794,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { ...@@ -1794,7 +1794,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) {
SSyncNode* pNode = param; SSyncNode* pNode = param;
if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) { if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) {
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
int32_t code = syncTimeoutBuild(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock), int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock),
pNode->pingTimerMS, pNode); pNode->pingTimerMS, pNode);
if (code != 0) { if (code != 0) {
sNError(pNode, "failed to build ping msg"); sNError(pNode, "failed to build ping msg");
...@@ -1824,7 +1824,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { ...@@ -1824,7 +1824,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
SSyncNode* pNode = pElectTimer->pSyncNode; SSyncNode* pNode = pElectTimer->pSyncNode;
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
int32_t code = syncTimeoutBuild(&rpcMsg, SYNC_TIMEOUT_ELECTION, pElectTimer->logicClock, pNode->electTimerMS, pNode); int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pElectTimer->logicClock, pNode->electTimerMS, pNode);
if (code != 0) { if (code != 0) {
sNError(pNode, "failed to build elect msg"); sNError(pNode, "failed to build elect msg");
...@@ -1863,7 +1863,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { ...@@ -1863,7 +1863,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
if (pNode->replicaNum > 1) { if (pNode->replicaNum > 1) {
if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) { if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) {
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
int32_t code = syncTimeoutBuild(&rpcMsg, SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pNode->heartbeatTimerLogicClock), int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pNode->heartbeatTimerLogicClock),
pNode->heartbeatTimerMS, pNode); pNode->heartbeatTimerMS, pNode);
if (code != 0) { if (code != 0) {
...@@ -1970,7 +1970,7 @@ static int32_t syncNodeEqNoop(SSyncNode* pNode) { ...@@ -1970,7 +1970,7 @@ static int32_t syncNodeEqNoop(SSyncNode* pNode) {
if (pEntry == NULL) return -1; if (pEntry == NULL) return -1;
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
int32_t code = syncClientRequestBuildFromNoopEntry(&rpcMsg, pEntry, pNode->vgId); int32_t code = syncBuildClientRequestFromNoopEntry(&rpcMsg, pEntry, pNode->vgId);
syncEntryDestory(pEntry); syncEntryDestory(pEntry);
sNTrace(pNode, "propose msg, type:noop"); sNTrace(pNode, "propose msg, type:noop");
......
...@@ -20,18 +20,18 @@ ...@@ -20,18 +20,18 @@
#include "syncUtil.h" #include "syncUtil.h"
#include "tcoding.h" #include "tcoding.h"
int32_t syncTimeoutBuild(SRpcMsg* pTimeoutRpcMsg, ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS, int32_t syncBuildTimeout(SRpcMsg* pMsg, ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS,
SSyncNode* pNode) { SSyncNode* pNode) {
int32_t bytes = sizeof(SyncTimeout); int32_t bytes = sizeof(SyncTimeout);
pTimeoutRpcMsg->pCont = rpcMallocCont(bytes); pMsg->pCont = rpcMallocCont(bytes);
pTimeoutRpcMsg->msgType = TDMT_SYNC_TIMEOUT; pMsg->msgType = TDMT_SYNC_TIMEOUT;
pTimeoutRpcMsg->contLen = bytes; pMsg->contLen = bytes;
if (pTimeoutRpcMsg->pCont == NULL) { if (pMsg->pCont == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
SyncTimeout* pTimeout = pTimeoutRpcMsg->pCont; SyncTimeout* pTimeout = pMsg->pCont;
pTimeout->bytes = bytes; pTimeout->bytes = bytes;
pTimeout->msgType = TDMT_SYNC_TIMEOUT; pTimeout->msgType = TDMT_SYNC_TIMEOUT;
pTimeout->vgId = pNode->vgId; pTimeout->vgId = pNode->vgId;
...@@ -42,41 +42,40 @@ int32_t syncTimeoutBuild(SRpcMsg* pTimeoutRpcMsg, ESyncTimeoutType timeoutType, ...@@ -42,41 +42,40 @@ int32_t syncTimeoutBuild(SRpcMsg* pTimeoutRpcMsg, ESyncTimeoutType timeoutType,
return 0; return 0;
} }
int32_t syncClientRequestBuildFromRpcMsg(SRpcMsg* pClientRequestRpcMsg, const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, int32_t syncBuildClientRequest(SRpcMsg* pMsg, const SRpcMsg* pOriginal, uint64_t seqNum, bool isWeak, int32_t vgId) {
bool isWeak, int32_t vgId) { int32_t bytes = sizeof(SyncClientRequest) + pOriginal->contLen;
int32_t bytes = sizeof(SyncClientRequest) + pOriginalRpcMsg->contLen; pMsg->pCont = rpcMallocCont(bytes);
pClientRequestRpcMsg->pCont = rpcMallocCont(bytes); pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST;
pClientRequestRpcMsg->msgType = TDMT_SYNC_CLIENT_REQUEST; pMsg->contLen = bytes;
pClientRequestRpcMsg->contLen = bytes; if (pMsg->pCont == NULL) {
if (pClientRequestRpcMsg->pCont == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
SyncClientRequest* pClientRequest = pClientRequestRpcMsg->pCont; SyncClientRequest* pClientRequest = pMsg->pCont;
pClientRequest->bytes = bytes; pClientRequest->bytes = bytes;
pClientRequest->vgId = vgId; pClientRequest->vgId = vgId;
pClientRequest->msgType = TDMT_SYNC_CLIENT_REQUEST; pClientRequest->msgType = TDMT_SYNC_CLIENT_REQUEST;
pClientRequest->originalRpcType = pOriginalRpcMsg->msgType; pClientRequest->originalRpcType = pOriginal->msgType;
pClientRequest->seqNum = seqNum; pClientRequest->seqNum = seqNum;
pClientRequest->isWeak = isWeak; pClientRequest->isWeak = isWeak;
pClientRequest->dataLen = pOriginalRpcMsg->contLen; pClientRequest->dataLen = pOriginal->contLen;
memcpy(pClientRequest->data, (char*)pOriginalRpcMsg->pCont, pOriginalRpcMsg->contLen); memcpy(pClientRequest->data, (char*)pOriginal->pCont, pOriginal->contLen);
return 0; return 0;
} }
int32_t syncClientRequestBuildFromNoopEntry(SRpcMsg* pClientRequestRpcMsg, const SSyncRaftEntry* pEntry, int32_t vgId) { int32_t syncBuildClientRequestFromNoopEntry(SRpcMsg* pMsg, const SSyncRaftEntry* pEntry, int32_t vgId) {
int32_t bytes = sizeof(SyncClientRequest) + pEntry->bytes; int32_t bytes = sizeof(SyncClientRequest) + pEntry->bytes;
pClientRequestRpcMsg->pCont = rpcMallocCont(bytes); pMsg->pCont = rpcMallocCont(bytes);
pClientRequestRpcMsg->msgType = TDMT_SYNC_CLIENT_REQUEST; pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST;
pClientRequestRpcMsg->contLen = bytes; pMsg->contLen = bytes;
if (pClientRequestRpcMsg->pCont == NULL) { if (pMsg->pCont == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
SyncClientRequest* pClientRequest = pClientRequestRpcMsg->pCont; SyncClientRequest* pClientRequest = pMsg->pCont;
pClientRequest->bytes = bytes; pClientRequest->bytes = bytes;
pClientRequest->vgId = vgId; pClientRequest->vgId = vgId;
pClientRequest->msgType = TDMT_SYNC_CLIENT_REQUEST; pClientRequest->msgType = TDMT_SYNC_CLIENT_REQUEST;
......
...@@ -45,7 +45,7 @@ SyncClientRequest *createSyncClientRequest() { ...@@ -45,7 +45,7 @@ SyncClientRequest *createSyncClientRequest() {
strcpy((char *)rpcMsg.pCont, "hello rpc"); strcpy((char *)rpcMsg.pCont, "hello rpc");
SRpcMsg clientRequestMsg; SRpcMsg clientRequestMsg;
syncClientRequestBuildFromRpcMsg(&clientRequestMsg, &rpcMsg, 123, true, 1000); syncBuildClientRequest(&clientRequestMsg, &rpcMsg, 123, true, 1000);
SyncClientRequest *pMsg = (SyncClientRequest *)taosMemoryMalloc(clientRequestMsg.contLen); SyncClientRequest *pMsg = (SyncClientRequest *)taosMemoryMalloc(clientRequestMsg.contLen);
memcpy(pMsg->data, clientRequestMsg.pCont, clientRequestMsg.contLen); memcpy(pMsg->data, clientRequestMsg.pCont, clientRequestMsg.contLen);
return pMsg; return pMsg;
......
...@@ -154,7 +154,7 @@ SRpcMsg *step0() { ...@@ -154,7 +154,7 @@ SRpcMsg *step0() {
SyncClientRequest *step1(const SRpcMsg *pMsg) { SyncClientRequest *step1(const SRpcMsg *pMsg) {
SRpcMsg clientRequestMsg; SRpcMsg clientRequestMsg;
syncClientRequestBuildFromRpcMsg(&clientRequestMsg, pMsg, 123, true, 1000); syncBuildClientRequest(&clientRequestMsg, pMsg, 123, true, 1000);
SyncClientRequest *pMsg2 = (SyncClientRequest *)taosMemoryMalloc(clientRequestMsg.contLen); SyncClientRequest *pMsg2 = (SyncClientRequest *)taosMemoryMalloc(clientRequestMsg.contLen);
memcpy(pMsg2->data, clientRequestMsg.pCont, clientRequestMsg.contLen); memcpy(pMsg2->data, clientRequestMsg.pCont, clientRequestMsg.contLen);
return pMsg2; return pMsg2;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册