提交 aea4e0f4 编写于 作者: M Minghao Li

sync refactor

上级 3ce39eec
...@@ -87,7 +87,10 @@ ...@@ -87,7 +87,10 @@
// //
int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
int32_t ret = 0; int32_t ret = 0;
syncAppendEntriesLog2("==syncNodeOnAppendEntriesCb==", pMsg);
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "==syncNodeOnAppendEntriesCb== term:%lu", ths->pRaftStore->currentTerm);
syncAppendEntriesLog2(logBuf, pMsg);
if (pMsg->term > ths->pRaftStore->currentTerm) { if (pMsg->term > ths->pRaftStore->currentTerm) {
syncNodeUpdateTerm(ths, pMsg->term); syncNodeUpdateTerm(ths, pMsg->term);
......
...@@ -37,7 +37,10 @@ ...@@ -37,7 +37,10 @@
// //
int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
int32_t ret = 0; int32_t ret = 0;
syncAppendEntriesReplyLog2("==syncNodeOnAppendEntriesReplyCb==", pMsg);
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "==syncNodeOnAppendEntriesReplyCb== term:%lu", ths->pRaftStore->currentTerm);
syncAppendEntriesReplyLog2(logBuf, pMsg);
if (pMsg->term < ths->pRaftStore->currentTerm) { if (pMsg->term < ths->pRaftStore->currentTerm) {
sTrace("DropStaleResponse, receive term:%" PRIu64 ", current term:%" PRIu64 "", pMsg->term, sTrace("DropStaleResponse, receive term:%" PRIu64 ", current term:%" PRIu64 "", pMsg->term,
......
...@@ -50,9 +50,10 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { ...@@ -50,9 +50,10 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
// update commit index // update commit index
SyncIndex newCommitIndex = pSyncNode->commitIndex; SyncIndex newCommitIndex = pSyncNode->commitIndex;
for (SyncIndex index = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore); index > pSyncNode->commitIndex; for (SyncIndex index = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore); index > pSyncNode->commitIndex;
++index) { --index) {
bool agree = syncAgree(pSyncNode, index); bool agree = syncAgree(pSyncNode, index);
sTrace("syncMaybeAdvanceCommitIndex syncAgree:%d, index:%ld", agree, index); sTrace("syncMaybeAdvanceCommitIndex syncAgree:%d, index:%ld, pSyncNode->commitIndex:%ld", agree, index,
pSyncNode->commitIndex);
if (agree) { if (agree) {
// term // term
SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, index); SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, index);
......
...@@ -43,7 +43,10 @@ ...@@ -43,7 +43,10 @@
// //
int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) { int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {
int32_t ret = 0; int32_t ret = 0;
syncRequestVoteLog2("==syncNodeOnRequestVoteCb==", pMsg);
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "==syncNodeOnRequestVoteCb== term:%lu", ths->pRaftStore->currentTerm);
syncRequestVoteLog2(logBuf, pMsg);
if (pMsg->term > ths->pRaftStore->currentTerm) { if (pMsg->term > ths->pRaftStore->currentTerm) {
syncNodeUpdateTerm(ths, pMsg->term); syncNodeUpdateTerm(ths, pMsg->term);
......
...@@ -38,7 +38,10 @@ ...@@ -38,7 +38,10 @@
// //
int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) { int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) {
int32_t ret = 0; int32_t ret = 0;
syncRequestVoteReplyLog2("==syncNodeOnRequestVoteReplyCb==", pMsg);
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "==syncNodeOnRequestVoteReplyCb== term:%lu", ths->pRaftStore->currentTerm);
syncRequestVoteReplyLog2(logBuf, pMsg);
if (pMsg->term < ths->pRaftStore->currentTerm) { if (pMsg->term < ths->pRaftStore->currentTerm) {
sTrace("DropStaleResponse, receive term:%" PRIu64 ", current term:%" PRIu64 "", pMsg->term, sTrace("DropStaleResponse, receive term:%" PRIu64 ", current term:%" PRIu64 "", pMsg->term,
......
...@@ -116,10 +116,9 @@ int main(int argc, char** argv) { ...@@ -116,10 +116,9 @@ int main(int argc, char** argv) {
//--------------------------- //---------------------------
while (1) { while (1) {
sTrace("while 1 sleep, state: %d, %s, electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, electTimerMS:%d", sTrace("elect sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, electTimerMS:%d",
gSyncNode->state, syncUtilState2String(gSyncNode->state), gSyncNode->electTimerLogicClock, gSyncNode->state, syncUtilState2String(gSyncNode->state), gSyncNode->pRaftStore->currentTerm, gSyncNode->electTimerLogicClock,
gSyncNode->electTimerLogicClockUser, gSyncNode->electTimerMS); gSyncNode->electTimerLogicClockUser, gSyncNode->electTimerMS);
taosMsleep(1000);
} }
return 0; return 0;
......
...@@ -119,7 +119,7 @@ SRpcMsg *step0(int i) { ...@@ -119,7 +119,7 @@ SRpcMsg *step0(int i) {
SRpcMsg *pMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg)); SRpcMsg *pMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg));
memset(pMsg, 0, sizeof(SRpcMsg)); memset(pMsg, 0, sizeof(SRpcMsg));
pMsg->msgType = 9999; pMsg->msgType = 9999;
pMsg->contLen = 32; pMsg->contLen = 128;
pMsg->pCont = malloc(pMsg->contLen); pMsg->pCont = malloc(pMsg->contLen);
snprintf((char *)(pMsg->pCont), pMsg->contLen, "value-%u-%d", ports[myIndex], i); snprintf((char *)(pMsg->pCont), pMsg->contLen, "value-%u-%d", ports[myIndex], i);
return pMsg; return pMsg;
...@@ -172,14 +172,14 @@ int main(int argc, char **argv) { ...@@ -172,14 +172,14 @@ int main(int argc, char **argv) {
gSyncNode->FpEqMsg(gSyncNode->queue, &rpcMsg); gSyncNode->FpEqMsg(gSyncNode->queue, &rpcMsg);
taosMsleep(1000); taosMsleep(1000);
sTrace("replicate sleep, state: %d, %s, electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, electTimerMS:%d", sTrace("replicate sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, electTimerMS:%d",
gSyncNode->state, syncUtilState2String(gSyncNode->state), gSyncNode->electTimerLogicClock, gSyncNode->state, syncUtilState2String(gSyncNode->state), gSyncNode->pRaftStore->currentTerm, gSyncNode->electTimerLogicClock,
gSyncNode->electTimerLogicClockUser, gSyncNode->electTimerMS); gSyncNode->electTimerLogicClockUser, gSyncNode->electTimerMS);
} }
while (1) { while (1) {
sTrace("while 1 sleep, state: %d, %s, electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, electTimerMS:%d", sTrace("replicate sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, electTimerMS:%d",
gSyncNode->state, syncUtilState2String(gSyncNode->state), gSyncNode->electTimerLogicClock, gSyncNode->state, syncUtilState2String(gSyncNode->state), gSyncNode->pRaftStore->currentTerm, gSyncNode->electTimerLogicClock,
gSyncNode->electTimerLogicClockUser, gSyncNode->electTimerMS); gSyncNode->electTimerLogicClockUser, gSyncNode->electTimerMS);
taosMsleep(1000); taosMsleep(1000);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册