提交 4d123c4d 编写于 作者: M Minghao Li

refactor(sync): add interface in sync

上级 2ae4081a
......@@ -698,6 +698,13 @@ int32_t syncNodeOnSnapshotRspCb(SSyncNode* ths, SyncSnapshotRsp* pMsg);
int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg);
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg);
int32_t syncNodeOnRequestVote(SSyncNode* ths, SyncRequestVote* pMsg);
int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, SyncRequestVoteReply* pMsg);
int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg);
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
int32_t syncNodeOnSnapshot(SSyncNode* ths, SyncSnapshotSend* pMsg);
int32_t syncNodeOnSnapshotReply(SSyncNode* ths, SyncSnapshotRsp* pMsg);
// -----------------------------------------
typedef int32_t (*FpOnPingCb)(SSyncNode* ths, SyncPing* pMsg);
typedef int32_t (*FpOnPingReplyCb)(SSyncNode* ths, SyncPingReply* pMsg);
......
......@@ -385,6 +385,84 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
vGTrace("vgId:%d, sync msg:%p will be processed, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
ASSERT(pSyncMsg != NULL);
code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
syncTimeoutDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_PING) {
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
ASSERT(pSyncMsg != NULL);
code = syncNodeOnPingCb(pSyncNode, pSyncMsg);
syncPingDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) {
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
ASSERT(pSyncMsg != NULL);
code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
syncPingReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
ASSERT(pSyncMsg != NULL);
code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL);
syncClientRequestDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST_BATCH) {
SyncClientRequestBatch *pSyncMsg = syncClientRequestBatchFromRpcMsg(pMsg);
ASSERT(pSyncMsg != NULL);
code = syncNodeOnClientRequestBatchCb(pSyncNode, pSyncMsg);
syncClientRequestBatchDestroyDeep(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
ASSERT(pSyncMsg != NULL);
code = syncNodeOnRequestVote(pSyncNode, pSyncMsg);
syncRequestVoteDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
ASSERT(pSyncMsg != NULL);
code = syncNodeOnRequestVoteReply(pSyncNode, pSyncMsg);
syncRequestVoteReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
ASSERT(pSyncMsg != NULL);
code = syncNodeOnAppendEntries(pSyncNode, pSyncMsg);
syncAppendEntriesDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
ASSERT(pSyncMsg != NULL);
code = syncNodeOnAppendEntriesReply(pSyncNode, pSyncMsg);
syncAppendEntriesReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) {
SyncSnapshotSend *pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg);
code = syncNodeOnSnapshot(pSyncNode, pSyncMsg);
syncSnapshotSendDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) {
SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg);
code = syncNodeOnSnapshotReply(pSyncNode, pSyncMsg);
syncSnapshotRspDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) {
code = vnodeSetStandBy(pVnode);
if (code != 0 && terrno != 0) code = terrno;
SRpcMsg rsp = {.code = code, .info = pMsg->info};
tmsgSendRsp(&rsp);
} else {
vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg, pMsg->msgType);
code = -1;
}
#if 0
if (syncNodeStrategy(pSyncNode) == SYNC_STRATEGY_NO_SNAPSHOT) {
if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
......@@ -506,6 +584,7 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
code = -1;
}
}
#endif
vTrace("vgId:%d, sync msg:%p is processed, type:%s code:0x%x", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType),
code);
......
......@@ -96,6 +96,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg);
int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMsg);
int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatch* pMsg);
int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg);
#ifdef __cplusplus
}
#endif
......
......@@ -44,6 +44,8 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p
int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
#ifdef __cplusplus
}
#endif
......
......@@ -76,7 +76,7 @@ typedef struct SSyncTimer {
uint64_t counter;
int32_t timerMS;
SRaftId destId;
void *pData;
void* pData;
} SSyncTimer;
int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId);
......
......@@ -52,6 +52,8 @@ extern "C" {
int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg);
int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg);
int32_t syncNodeOnRequestVote(SSyncNode* ths, SyncRequestVote* pMsg);
#ifdef __cplusplus
}
#endif
......
......@@ -47,6 +47,8 @@ extern "C" {
int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg);
int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteReply* pMsg);
int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, SyncRequestVoteReply* pMsg);
#ifdef __cplusplus
}
#endif
......
......@@ -97,6 +97,9 @@ char * snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event)
int32_t syncNodeOnSnapshotSendCb(SSyncNode *ths, SyncSnapshotSend *pMsg);
int32_t syncNodeOnSnapshotRspCb(SSyncNode *ths, SyncSnapshotRsp *pMsg);
int32_t syncNodeOnSnapshot(SSyncNode *ths, SyncSnapshotSend *pMsg);
int32_t syncNodeOnSnapshotReply(SSyncNode *ths, SyncSnapshotRsp *pMsg);
#ifdef __cplusplus
}
#endif
......
......@@ -1041,3 +1041,5 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
return ret;
}
int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) { return 0; }
\ No newline at end of file
......@@ -413,4 +413,6 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
} while (0);
return 0;
}
\ No newline at end of file
}
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { return 0; }
\ No newline at end of file
......@@ -932,13 +932,12 @@ int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId de
int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
int32_t ret = 0;
if (syncEnvIsStart()) {
SSyncHbTimerData *pData = taosMemoryMalloc(sizeof(SSyncHbTimerData));
SSyncHbTimerData* pData = taosMemoryMalloc(sizeof(SSyncHbTimerData));
pData->pSyncNode = pSyncNode;
pData->pTimer = pSyncTimer;
pData->destId = pSyncTimer->destId;
pData->logicClock = pSyncTimer->logicClock;
pSyncTimer->pData = pData;
taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS, pData, gSyncEnv->pTimerManager, &pSyncTimer->pTimer);
} else {
......@@ -952,7 +951,7 @@ int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
atomic_add_fetch_64(&pSyncTimer->logicClock, 1);
taosTmrStop(pSyncTimer->pTimer);
pSyncTimer->pTimer = NULL;
//taosMemoryFree(pSyncTimer->pData);
// taosMemoryFree(pSyncTimer->pData);
return ret;
}
......
......@@ -213,4 +213,6 @@ int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg) {
syncRequestVoteReplyDestroy(pReply);
return 0;
}
\ No newline at end of file
}
int32_t syncNodeOnRequestVote(SSyncNode* ths, SyncRequestVote* pMsg) { return 0; }
\ No newline at end of file
......@@ -155,4 +155,6 @@ int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteRepl
}
return 0;
}
\ No newline at end of file
}
int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, SyncRequestVoteReply* pMsg) { return 0; }
\ No newline at end of file
......@@ -901,3 +901,7 @@ int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
return 0;
}
int32_t syncNodeOnSnapshot(SSyncNode *ths, SyncSnapshotSend *pMsg) { return 0; }
int32_t syncNodeOnSnapshotReply(SSyncNode *ths, SyncSnapshotRsp *pMsg) { return 0; }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册