diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 2fc1cc07edb229b3e64f90b145215cde58ce419e..6d0971a1fa03ecd03a6fe68cc95512e8f6a5fe83 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -434,92 +434,43 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { return ret; } -static bool syncNodeOnAppendEntriesLogOK(SSyncNode* pSyncNode, SyncAppendEntries* pMsg) { - if (pMsg->prevLogIndex == SYNC_INDEX_INVALID) { - return true; - } - - SyncIndex myLastIndex = syncNodeGetLastIndex(pSyncNode); - if (pMsg->prevLogIndex > myLastIndex) { - return false; - } - - SyncTerm myPreLogTerm = syncNodeGetPreTerm(pSyncNode, pMsg->prevLogIndex + 1); - if (pMsg->prevLogIndex <= myLastIndex && pMsg->prevLogTerm == myPreLogTerm) { - return true; - } - - return false; -} - -static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg, SSyncRaftEntry** ppAppendEntry, - bool* pEntryAlreadyWritten) { +static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) { int32_t code; - *ppAppendEntry = NULL; - *pEntryAlreadyWritten = false; - - // not conflict by default - bool conflict = false; - - SyncIndex extraIndex = pMsg->prevLogIndex + 1; - SSyncRaftEntry* pExtraEntry; - code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, extraIndex, &pExtraEntry); - ASSERT(pExtraEntry != NULL); - - *ppAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); - ASSERT(*ppAppendEntry != NULL); - - ASSERT(extraIndex == (*ppAppendEntry)->index); - if (pExtraEntry->term != (*ppAppendEntry)->term) { - // log not match, conflict, need delete - conflict = true; - } else { - // log match, already written - ASSERT(extraIndex == (*ppAppendEntry)->index && pExtraEntry->term == (*ppAppendEntry)->term); - *pEntryAlreadyWritten = true; - sInfo("entry already written, term:%lu, index:%ld", pExtraEntry->term, pExtraEntry->index); - } - syncEntryDestory(pExtraEntry); - - if (conflict) { - // roll back - SyncIndex delBegin = ths->pLogStore->syncLogLastIndex(ths->pLogStore); - SyncIndex delEnd = extraIndex; - - sTrace("entry conflict:%d, delBegin:%ld, delEnd:%ld", conflict, delBegin, delEnd); - - // notice! reverse roll back! - for (SyncIndex index = delEnd; index >= delBegin; --index) { - if (ths->pFsm->FpRollBackCb != NULL) { - SSyncRaftEntry* pRollBackEntry; - code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, index, &pRollBackEntry); - ASSERT(code == 0); - ASSERT(pRollBackEntry != NULL); - if (syncUtilUserRollback(pRollBackEntry->msgType)) { - SRpcMsg rpcMsg; - syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg); + SyncIndex delBegin = pMsg->prevLogIndex + 1; + SyncIndex delEnd = ths->pLogStore->syncLogLastIndex(ths->pLogStore); - SFsmCbMeta cbMeta; - cbMeta.index = pRollBackEntry->index; - cbMeta.isWeak = pRollBackEntry->isWeak; - cbMeta.code = 0; - cbMeta.state = ths->state; - cbMeta.seqNum = pRollBackEntry->seqNum; - ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, cbMeta); - rpcFreeCont(rpcMsg.pCont); - } + // invert roll back! + for (SyncIndex index = delEnd; index >= delBegin; --index) { + if (ths->pFsm->FpRollBackCb != NULL) { + SSyncRaftEntry* pRollBackEntry; + code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, index, &pRollBackEntry); + ASSERT(code == 0); + ASSERT(pRollBackEntry != NULL); - syncEntryDestory(pRollBackEntry); + if (syncUtilUserRollback(pRollBackEntry->msgType)) { + SRpcMsg rpcMsg; + syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg); + + SFsmCbMeta cbMeta; + cbMeta.index = pRollBackEntry->index; + cbMeta.isWeak = pRollBackEntry->isWeak; + cbMeta.code = 0; + cbMeta.state = ths->state; + cbMeta.seqNum = pRollBackEntry->seqNum; + ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, cbMeta); + rpcFreeCont(rpcMsg.pCont); } - } - // delete confict entries - code = ths->pLogStore->syncLogTruncate(ths->pLogStore, extraIndex); - ASSERT(code == 0); + syncEntryDestory(pRollBackEntry); + } } - return 0; + // delete confict entries + code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin); + ASSERT(code == 0); + + return code; } static int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry) { @@ -540,6 +491,26 @@ static int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry) { return 0; } +// really pre log match +// prevLogIndex == -1 +static bool syncNodeOnAppendEntriesLogOK(SSyncNode* pSyncNode, SyncAppendEntries* pMsg) { + if (pMsg->prevLogIndex == SYNC_INDEX_INVALID) { + return true; + } + + SyncIndex myLastIndex = syncNodeGetLastIndex(pSyncNode); + if (pMsg->prevLogIndex > myLastIndex) { + return false; + } + + SyncTerm myPreLogTerm = syncNodeGetPreTerm(pSyncNode, pMsg->prevLogIndex + 1); + if (pMsg->prevLogIndex <= myLastIndex && pMsg->prevLogTerm == myPreLogTerm) { + return true; + } + + return false; +} + int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMsg) { int32_t ret = 0; int32_t code = 0; @@ -549,7 +520,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs snprintf(logBuf, sizeof(logBuf), "recv SyncAppendEntries, term:%lu", ths->pRaftStore->currentTerm); syncAppendEntriesLog2(logBuf, pMsg); - // if I am standby, be added into a raft group, I should process SyncAppendEntries msg + // if I am standby, to be added into a raft group, I should process SyncAppendEntries msg /* // if already drop replica, do not process if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) { @@ -573,57 +544,55 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs bool logOK = syncNodeOnAppendEntriesLogOK(ths, pMsg); - // case1, reject request - if ((pMsg->term < ths->pRaftStore->currentTerm) || - ((pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && !logOK)) { - sTrace("recv SyncAppendEntries, reject, receive_term:%lu, current_term:%lu, ths->state:%d, logOK:%d", pMsg->term, - ths->pRaftStore->currentTerm, ths->state, logOK); - - // send response - SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); - pReply->srcId = ths->myRaftId; - pReply->destId = pMsg->srcId; - pReply->term = ths->pRaftStore->currentTerm; - pReply->success = false; - pReply->matchIndex = SYNC_INDEX_INVALID; - pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; - - SRpcMsg rpcMsg; - syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); - syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); - syncAppendEntriesReplyDestroy(pReply); - - return ret; - } - - // case 2, return to follower state - if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) { - sTrace("recv SyncAppendEntries, return to follower, receive_term:%lu, current_term:%lu, ths->state:%d, logOK:%d", - pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK); - - syncNodeBecomeFollower(ths); - - // ret or reply? - return ret; - } - - // case 3, index in my snapshot - if (pMsg->term == ths->pRaftStore->currentTerm && syncNodeHasSnapshot(ths)) { + // candidate to follower + // + // operation: + // to follower + do { + bool condition = pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE; + if (condition) { + syncNodeBecomeFollower(ths); + // do not reply? + return ret; + } + } while (0); + + // fake match + // + // condition1: + // I have snapshot, no log, preIndex > myLastIndex + // + // condition2: + // I have snapshot, have log, log <= snapshot, preIndex > myLastIndex + // + // condition3: + // I have snapshot, preIndex <= snapshot.lastApplyIndex + // + // operation: + // match snapshot.lastApplyIndex - 1; + // no operation on log + do { + SyncIndex myLastIndex = syncNodeGetLastIndex(ths); SSnapshot snapshot; ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot); - if (pMsg->prevLogIndex < snapshot.lastApplyIndex) { - sTrace( - "recv SyncAppendEntries, accept, in snapshot, receive_term:%lu, current_term:%lu, ths->state:%d, logOK:%d, " - "snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu", - pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK, snapshot.lastApplyIndex, snapshot.lastApplyTerm); + bool condition0 = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && + syncNodeHasSnapshot(ths); + bool condition1 = + condition0 && (ths->pLogStore->syncLogEntryCount(ths->pLogStore) == 0) && (pMsg->prevLogIndex > myLastIndex); + bool condition2 = condition0 && (ths->pLogStore->syncLogLastIndex(ths->pLogStore) <= snapshot.lastApplyIndex) && + (pMsg->prevLogIndex > myLastIndex); + bool condition3 = condition0 && (pMsg->prevLogIndex <= snapshot.lastApplyIndex); + bool condition = condition1 || condition2 || condition3; + + if (condition) { // prepare response msg SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); pReply->srcId = ths->myRaftId; pReply->destId = pMsg->srcId; pReply->term = ths->pRaftStore->currentTerm; - pReply->success = true; pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; + pReply->success = true; pReply->matchIndex = snapshot.lastApplyIndex - 1; // send response @@ -634,105 +603,120 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs return ret; } - } - - // case 4, accept request - if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_FOLLOWER && logOK) { - // has extra entries (> preIndex) in local log - SyncIndex myLastIndex = syncNodeGetLastIndex(ths); - bool hasExtraEntries = myLastIndex > pMsg->prevLogIndex; + } while (0); + + // not match + // + // condition1: + // term < myTerm + // + // condition2: + // !logOK + // + // operation: + // not match + // no operation on log + do { + bool condition1 = pMsg->term < ths->pRaftStore->currentTerm; + bool condition2 = + (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && !logOK; + bool condition = condition1 || condition2; + + if (condition) { + // prepare response msg + SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); + pReply->srcId = ths->myRaftId; + pReply->destId = pMsg->srcId; + pReply->term = ths->pRaftStore->currentTerm; + pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; + pReply->success = false; + pReply->matchIndex = SYNC_INDEX_INVALID; - // has entries in SyncAppendEntries msg - bool hasAppendEntries = pMsg->dataLen > 0; + // send response + SRpcMsg rpcMsg; + syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); + syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); + syncAppendEntriesReplyDestroy(pReply); - sTrace( - "recv SyncAppendEntries, accept, receive_term:%lu, current_term:%lu, ths->state:%d, logOK:%d, " - "hasExtraEntries:%d, hasAppendEntries:%d", - pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK, hasExtraEntries, hasAppendEntries); + return ret; + } + } while (0); + + // really match + // + // condition: + // logOK + // + // operation: + // match + // make log same + do { + bool condition = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && logOK; + if (condition) { + // has extra entries (> preIndex) in local log + SyncIndex myLastIndex = syncNodeGetLastIndex(ths); + bool hasExtraEntries = myLastIndex > pMsg->prevLogIndex; + + // has entries in SyncAppendEntries msg + bool hasAppendEntries = pMsg->dataLen > 0; + + if (hasExtraEntries) { + // make log same, rollback deleted entries + code = syncNodeMakeLogSame(ths, pMsg); + ASSERT(code == 0); + } - if (hasExtraEntries && hasAppendEntries) { - // make log same - SSyncRaftEntry* pAppendEntry; - bool entryAlreadyWritten; - code = syncNodeMakeLogSame(ths, pMsg, &pAppendEntry, &entryAlreadyWritten); - ASSERT(code == 0); - ASSERT(pAppendEntry != NULL); + if (hasAppendEntries) { + // append entry + SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); + ASSERT(pAppendEntry != NULL); - if (!entryAlreadyWritten) { - // append new entries code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); ASSERT(code == 0); // pre commit code = syncNodePreCommit(ths, pAppendEntry); ASSERT(code == 0); - } - - syncEntryDestory(pAppendEntry); - - } else if (hasExtraEntries && !hasAppendEntries) { - // do nothing - - } else if (!hasExtraEntries && hasAppendEntries) { - SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); - ASSERT(pAppendEntry != NULL); - - // append new entries - code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); - ASSERT(code == 0); - - // pre commit - code = syncNodePreCommit(ths, pAppendEntry); - ASSERT(code == 0); - syncEntryDestory(pAppendEntry); - - } else if (!hasExtraEntries && !hasAppendEntries) { - // do nothing - - } else { - ASSERT(0); - } - - // prepare response msg - SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); - pReply->srcId = ths->myRaftId; - pReply->destId = pMsg->srcId; - pReply->term = ths->pRaftStore->currentTerm; - pReply->success = true; - pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; + syncEntryDestory(pAppendEntry); + } - if (hasAppendEntries) { - pReply->matchIndex = pMsg->prevLogIndex + 1; - } else { - pReply->matchIndex = pMsg->prevLogIndex; - } + // prepare response msg + SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); + pReply->srcId = ths->myRaftId; + pReply->destId = pMsg->srcId; + pReply->term = ths->pRaftStore->currentTerm; + pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; + pReply->success = true; + pReply->matchIndex = hasAppendEntries ? pMsg->prevLogIndex + 1 : pMsg->prevLogIndex; - // send response - SRpcMsg rpcMsg; - syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); - syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); - syncAppendEntriesReplyDestroy(pReply); + // send response + SRpcMsg rpcMsg; + syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); + syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); + syncAppendEntriesReplyDestroy(pReply); - // maybe update commit index from leader - if (pMsg->commitIndex > ths->commitIndex) { - // has commit entry in local - if (pMsg->commitIndex <= ths->pLogStore->syncLogLastIndex(ths->pLogStore)) { - SyncIndex beginIndex = ths->commitIndex + 1; - SyncIndex endIndex = pMsg->commitIndex; + // maybe update commit index, leader notice me + if (pMsg->commitIndex > ths->commitIndex) { + // has commit entry in local + if (pMsg->commitIndex <= ths->pLogStore->syncLogLastIndex(ths->pLogStore)) { + SyncIndex beginIndex = ths->commitIndex + 1; + SyncIndex endIndex = pMsg->commitIndex; - // update commit index - ths->commitIndex = pMsg->commitIndex; + // update commit index + ths->commitIndex = pMsg->commitIndex; - // call back Wal - code = ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex); - ASSERT(code == 0); + // call back Wal + code = ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex); + ASSERT(code == 0); - code = syncNodeCommit(ths, beginIndex, endIndex, ths->state); - ASSERT(code == 0); + code = syncNodeCommit(ths, beginIndex, endIndex, ths->state); + ASSERT(code == 0); + } } + return ret; } - } + } while (0); return ret; -} \ No newline at end of file +} diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index ad76b0d6a852d717d47917115e2195c283b359ce..acbe658914c22d7e43339c02ca431886315a1457 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1283,7 +1283,6 @@ bool syncNodeIsIndexInSnapshot(SSyncNode* pSyncNode, SyncIndex index) { SSnapshot snapshot; pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); bool b = (index <= snapshot.lastApplyIndex); - return b; } @@ -1307,17 +1306,10 @@ SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) { pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); } - if (pSyncNode->pLogStore->syncLogEntryCount(pSyncNode->pLogStore) > 0) { - // has log - SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); - if (logLastIndex > snapshot.lastApplyIndex) { - lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore); - } else { - lastTerm = snapshot.lastApplyTerm; - } - + SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); + if (logLastIndex > snapshot.lastApplyIndex) { + lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore); } else { - // no log lastTerm = snapshot.lastApplyTerm; } @@ -1346,22 +1338,7 @@ SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) { SyncIndex syncStartIndex = syncNodeSyncStartIndex(pSyncNode); ASSERT(index <= syncStartIndex); - SyncIndex preIndex; - if (syncNodeHasSnapshot(pSyncNode)) { - // has snapshot - SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; - if (pSyncNode->pFsm->FpGetSnapshot != NULL) { - pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); - } - - // ASSERT(index > snapshot.lastApplyIndex); - preIndex = index - 1; - - } else { - // no snapshot - preIndex = index - 1; - } - + SyncIndex preIndex = index - 1; return preIndex; } @@ -1382,7 +1359,6 @@ SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) { pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); } - // ASSERT(index > snapshot.lastApplyIndex); if (index > snapshot.lastApplyIndex + 1) { // should be log preTerm SSyncRaftEntry* pPreEntry = NULL; @@ -1395,10 +1371,19 @@ SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) { } else if (index == snapshot.lastApplyIndex + 1) { preTerm = snapshot.lastApplyTerm; + } else { - // ASSERT(0); // maybe snapshot change - preTerm = snapshot.lastApplyTerm; + sError("sync get pre term, bad scene. index:%ld", index); + logStoreLog2("sync get pre term, bad scene", pSyncNode->pLogStore); + + SSyncRaftEntry* pPreEntry = NULL; + int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index - 1, &pPreEntry); + ASSERT(code == 0); + ASSERT(pPreEntry != NULL); + + preTerm = pPreEntry->term; + taosMemoryFree(pPreEntry); } } else { diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index d55f0b80d5a83c6c8078c773c2cdd96a279f9dfe..e03f8ba65f330d6677076cd6dfa9c644fcd1c4bf 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -437,10 +437,6 @@ cJSON* logStore2Json(SSyncLogStore* pLogStore) { cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf); snprintf(u64buf, sizeof(u64buf), "%p", pData->pWal); cJSON_AddStringToObject(pRoot, "pWal", u64buf); - snprintf(u64buf, sizeof(u64buf), "%ld", raftLogLastIndex(pLogStore)); - cJSON_AddStringToObject(pRoot, "LastIndex", u64buf); - snprintf(u64buf, sizeof(u64buf), "%lu", raftLogLastTerm(pLogStore)); - cJSON_AddStringToObject(pRoot, "LastTerm", u64buf); snprintf(u64buf, sizeof(u64buf), "%ld", pData->beginIndex); cJSON_AddStringToObject(pRoot, "beginIndex", u64buf); @@ -452,6 +448,17 @@ cJSON* logStore2Json(SSyncLogStore* pLogStore) { int32_t count = raftLogEntryCount(pLogStore); cJSON_AddNumberToObject(pRoot, "entryCount", count); + snprintf(u64buf, sizeof(u64buf), "%ld", raftLogWriteIndex(pLogStore)); + cJSON_AddStringToObject(pRoot, "WriteIndex", u64buf); + + snprintf(u64buf, sizeof(u64buf), "%d", raftLogIsEmpty(pLogStore)); + cJSON_AddStringToObject(pRoot, "IsEmpty", u64buf); + + snprintf(u64buf, sizeof(u64buf), "%ld", raftLogLastIndex(pLogStore)); + cJSON_AddStringToObject(pRoot, "LastIndex", u64buf); + snprintf(u64buf, sizeof(u64buf), "%lu", raftLogLastTerm(pLogStore)); + cJSON_AddStringToObject(pRoot, "LastTerm", u64buf); + cJSON* pEntries = cJSON_CreateArray(); cJSON_AddItemToObject(pRoot, "pEntries", pEntries); @@ -484,10 +491,6 @@ cJSON* logStoreSimple2Json(SSyncLogStore* pLogStore) { cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf); snprintf(u64buf, sizeof(u64buf), "%p", pData->pWal); cJSON_AddStringToObject(pRoot, "pWal", u64buf); - snprintf(u64buf, sizeof(u64buf), "%ld", raftLogLastIndex(pLogStore)); - cJSON_AddStringToObject(pRoot, "LastIndex", u64buf); - snprintf(u64buf, sizeof(u64buf), "%lu", raftLogLastTerm(pLogStore)); - cJSON_AddStringToObject(pRoot, "LastTerm", u64buf); snprintf(u64buf, sizeof(u64buf), "%ld", pData->beginIndex); cJSON_AddStringToObject(pRoot, "beginIndex", u64buf); @@ -498,6 +501,17 @@ cJSON* logStoreSimple2Json(SSyncLogStore* pLogStore) { int32_t count = raftLogEntryCount(pLogStore); cJSON_AddNumberToObject(pRoot, "entryCount", count); + + snprintf(u64buf, sizeof(u64buf), "%ld", raftLogWriteIndex(pLogStore)); + cJSON_AddStringToObject(pRoot, "WriteIndex", u64buf); + + snprintf(u64buf, sizeof(u64buf), "%d", raftLogIsEmpty(pLogStore)); + cJSON_AddStringToObject(pRoot, "IsEmpty", u64buf); + + snprintf(u64buf, sizeof(u64buf), "%ld", raftLogLastIndex(pLogStore)); + cJSON_AddStringToObject(pRoot, "LastIndex", u64buf); + snprintf(u64buf, sizeof(u64buf), "%lu", raftLogLastTerm(pLogStore)); + cJSON_AddStringToObject(pRoot, "LastTerm", u64buf); } cJSON* pJson = cJSON_CreateObject(); diff --git a/source/libs/sync/test/syncRaftLogTest2.cpp b/source/libs/sync/test/syncRaftLogTest2.cpp index a23384df968bf3b1f424fbb942ff9f035cc89d4a..d05401b0f39f7d44d2a17215c3d667e1cf1965b3 100644 --- a/source/libs/sync/test/syncRaftLogTest2.cpp +++ b/source/libs/sync/test/syncRaftLogTest2.cpp @@ -32,6 +32,8 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) { return 0; } +bool gAssert = true; + void init() { walInit(); @@ -68,6 +70,17 @@ void test1() { pLogStore = logStoreCreate(pSyncNode); assert(pLogStore); logStoreLog2((char*)"\n\n\ntest1 ----- ", pLogStore); + + if (gAssert) { + assert(pLogStore->syncLogBeginIndex(pLogStore) == 0); + assert(pLogStore->syncLogEndIndex(pLogStore) == -1); + assert(pLogStore->syncLogEntryCount(pLogStore) == 0); + assert(pLogStore->syncLogWriteIndex(pLogStore) == 0); + assert(pLogStore->syncLogIsEmpty(pLogStore) == 0); + assert(pLogStore->syncLogLastIndex(pLogStore) == -1); + assert(pLogStore->syncLogLastTerm(pLogStore) == 0); + } + logStoreDestory(pLogStore); cleanup(); @@ -76,6 +89,17 @@ void test1() { pLogStore = logStoreCreate(pSyncNode); assert(pLogStore); logStoreLog2((char*)"\n\n\ntest1 restart ----- ", pLogStore); + + if (gAssert) { + assert(pLogStore->syncLogBeginIndex(pLogStore) == 0); + assert(pLogStore->syncLogEndIndex(pLogStore) == -1); + assert(pLogStore->syncLogEntryCount(pLogStore) == 0); + assert(pLogStore->syncLogWriteIndex(pLogStore) == 0); + assert(pLogStore->syncLogIsEmpty(pLogStore) == 0); + assert(pLogStore->syncLogLastIndex(pLogStore) == -1); + assert(pLogStore->syncLogLastTerm(pLogStore) == 0); + } + logStoreDestory(pLogStore); cleanup(); } @@ -88,6 +112,17 @@ void test2() { assert(pLogStore); pLogStore->syncLogSetBeginIndex(pLogStore, 5); logStoreLog2((char*)"\n\n\ntest2 ----- ", pLogStore); + + if (gAssert) { + assert(pLogStore->syncLogBeginIndex(pLogStore) == 5); + assert(pLogStore->syncLogEndIndex(pLogStore) == -1); + assert(pLogStore->syncLogEntryCount(pLogStore) == 0); + assert(pLogStore->syncLogWriteIndex(pLogStore) == 5); + assert(pLogStore->syncLogIsEmpty(pLogStore) == 0); + assert(pLogStore->syncLogLastIndex(pLogStore) == -1); + assert(pLogStore->syncLogLastTerm(pLogStore) == 0); + } + logStoreDestory(pLogStore); cleanup(); @@ -96,6 +131,17 @@ void test2() { pLogStore = logStoreCreate(pSyncNode); assert(pLogStore); logStoreLog2((char*)"\n\n\ntest2 restart ----- ", pLogStore); + + if (gAssert) { + assert(pLogStore->syncLogBeginIndex(pLogStore) == 5); + assert(pLogStore->syncLogEndIndex(pLogStore) == -1); + assert(pLogStore->syncLogEntryCount(pLogStore) == 0); + assert(pLogStore->syncLogWriteIndex(pLogStore) == 5); + assert(pLogStore->syncLogIsEmpty(pLogStore) == 0); + assert(pLogStore->syncLogLastIndex(pLogStore) == -1); + assert(pLogStore->syncLogLastTerm(pLogStore) == 0); + } + logStoreDestory(pLogStore); cleanup(); } @@ -108,6 +154,16 @@ void test3() { assert(pLogStore); logStoreLog2((char*)"\n\n\ntest3 ----- ", pLogStore); + if (gAssert) { + assert(pLogStore->syncLogBeginIndex(pLogStore) == 0); + assert(pLogStore->syncLogEndIndex(pLogStore) == -1); + assert(pLogStore->syncLogEntryCount(pLogStore) == 0); + assert(pLogStore->syncLogWriteIndex(pLogStore) == 0); + assert(pLogStore->syncLogIsEmpty(pLogStore) == 0); + assert(pLogStore->syncLogLastIndex(pLogStore) == -1); + assert(pLogStore->syncLogLastTerm(pLogStore) == 0); + } + for (int i = 0; i <= 4; ++i) { int32_t dataLen = 10; SSyncRaftEntry* pEntry = syncEntryBuild(dataLen); @@ -124,6 +180,17 @@ void test3() { syncEntryDestory(pEntry); } logStoreLog2((char*)"test3 after appendEntry", pLogStore); + + if (gAssert) { + assert(pLogStore->syncLogBeginIndex(pLogStore) == 0); + assert(pLogStore->syncLogEndIndex(pLogStore) == 4); + assert(pLogStore->syncLogEntryCount(pLogStore) == 5); + assert(pLogStore->syncLogWriteIndex(pLogStore) == 5); + assert(pLogStore->syncLogIsEmpty(pLogStore) == 1); + assert(pLogStore->syncLogLastIndex(pLogStore) == 4); + assert(pLogStore->syncLogLastTerm(pLogStore) == 104); + } + logStoreDestory(pLogStore); cleanup(); @@ -132,6 +199,17 @@ void test3() { pLogStore = logStoreCreate(pSyncNode); assert(pLogStore); logStoreLog2((char*)"\n\n\ntest3 restart ----- ", pLogStore); + + if (gAssert) { + assert(pLogStore->syncLogBeginIndex(pLogStore) == 0); + assert(pLogStore->syncLogEndIndex(pLogStore) == 4); + assert(pLogStore->syncLogEntryCount(pLogStore) == 5); + assert(pLogStore->syncLogWriteIndex(pLogStore) == 5); + assert(pLogStore->syncLogIsEmpty(pLogStore) == 1); + assert(pLogStore->syncLogLastIndex(pLogStore) == 4); + assert(pLogStore->syncLogLastTerm(pLogStore) == 104); + } + logStoreDestory(pLogStore); cleanup(); } @@ -161,6 +239,17 @@ void test4() { syncEntryDestory(pEntry); } logStoreLog2((char*)"test4 after appendEntry", pLogStore); + + if (gAssert) { + assert(pLogStore->syncLogBeginIndex(pLogStore) == 5); + assert(pLogStore->syncLogEndIndex(pLogStore) == 9); + assert(pLogStore->syncLogEntryCount(pLogStore) == 5); + assert(pLogStore->syncLogWriteIndex(pLogStore) == 10); + assert(pLogStore->syncLogIsEmpty(pLogStore) == 1); + assert(pLogStore->syncLogLastIndex(pLogStore) == 9); + assert(pLogStore->syncLogLastTerm(pLogStore) == 109); + } + logStoreDestory(pLogStore); cleanup(); @@ -169,6 +258,17 @@ void test4() { pLogStore = logStoreCreate(pSyncNode); assert(pLogStore); logStoreLog2((char*)"\n\n\ntest4 restart ----- ", pLogStore); + + if (gAssert) { + assert(pLogStore->syncLogBeginIndex(pLogStore) == 5); + assert(pLogStore->syncLogEndIndex(pLogStore) == 9); + assert(pLogStore->syncLogEntryCount(pLogStore) == 5); + assert(pLogStore->syncLogWriteIndex(pLogStore) == 10); + assert(pLogStore->syncLogIsEmpty(pLogStore) == 1); + assert(pLogStore->syncLogLastIndex(pLogStore) == 9); + assert(pLogStore->syncLogLastTerm(pLogStore) == 109); + } + logStoreDestory(pLogStore); cleanup(); } @@ -199,9 +299,29 @@ void test5() { } logStoreLog2((char*)"test5 after appendEntry", pLogStore); + if (gAssert) { + assert(pLogStore->syncLogBeginIndex(pLogStore) == 5); + assert(pLogStore->syncLogEndIndex(pLogStore) == 9); + assert(pLogStore->syncLogEntryCount(pLogStore) == 5); + assert(pLogStore->syncLogWriteIndex(pLogStore) == 10); + assert(pLogStore->syncLogIsEmpty(pLogStore) == 1); + assert(pLogStore->syncLogLastIndex(pLogStore) == 9); + assert(pLogStore->syncLogLastTerm(pLogStore) == 109); + } + pLogStore->syncLogTruncate(pLogStore, 7); logStoreLog2((char*)"after truncate 7", pLogStore); + if (gAssert) { + assert(pLogStore->syncLogBeginIndex(pLogStore) == 5); + assert(pLogStore->syncLogEndIndex(pLogStore) == 6); + assert(pLogStore->syncLogEntryCount(pLogStore) == 2); + assert(pLogStore->syncLogWriteIndex(pLogStore) == 7); + assert(pLogStore->syncLogIsEmpty(pLogStore) == 1); + assert(pLogStore->syncLogLastIndex(pLogStore) == 6); + assert(pLogStore->syncLogLastTerm(pLogStore) == 106); + } + logStoreDestory(pLogStore); cleanup(); @@ -210,6 +330,17 @@ void test5() { pLogStore = logStoreCreate(pSyncNode); assert(pLogStore); logStoreLog2((char*)"\n\n\ntest5 restart ----- ", pLogStore); + + if (gAssert) { + assert(pLogStore->syncLogBeginIndex(pLogStore) == 5); + assert(pLogStore->syncLogEndIndex(pLogStore) == 6); + assert(pLogStore->syncLogEntryCount(pLogStore) == 2); + assert(pLogStore->syncLogWriteIndex(pLogStore) == 7); + assert(pLogStore->syncLogIsEmpty(pLogStore) == 1); + assert(pLogStore->syncLogLastIndex(pLogStore) == 6); + assert(pLogStore->syncLogLastTerm(pLogStore) == 106); + } + logStoreDestory(pLogStore); cleanup(); } @@ -240,9 +371,29 @@ void test6() { } logStoreLog2((char*)"test6 after appendEntry", pLogStore); + if (gAssert) { + assert(pLogStore->syncLogBeginIndex(pLogStore) == 5); + assert(pLogStore->syncLogEndIndex(pLogStore) == 9); + assert(pLogStore->syncLogEntryCount(pLogStore) == 5); + assert(pLogStore->syncLogWriteIndex(pLogStore) == 10); + assert(pLogStore->syncLogIsEmpty(pLogStore) == 1); + assert(pLogStore->syncLogLastIndex(pLogStore) == 9); + assert(pLogStore->syncLogLastTerm(pLogStore) == 109); + } + pLogStore->syncLogTruncate(pLogStore, 5); logStoreLog2((char*)"after truncate 5", pLogStore); + if (gAssert) { + assert(pLogStore->syncLogBeginIndex(pLogStore) == 5); + assert(pLogStore->syncLogEndIndex(pLogStore) == -1); + assert(pLogStore->syncLogEntryCount(pLogStore) == 0); + assert(pLogStore->syncLogWriteIndex(pLogStore) == 5); + assert(pLogStore->syncLogIsEmpty(pLogStore) == 0); + assert(pLogStore->syncLogLastIndex(pLogStore) == -1); + assert(pLogStore->syncLogLastTerm(pLogStore) == 0); + } + logStoreDestory(pLogStore); cleanup(); @@ -251,6 +402,17 @@ void test6() { pLogStore = logStoreCreate(pSyncNode); assert(pLogStore); logStoreLog2((char*)"\n\n\ntest6 restart ----- ", pLogStore); + + if (gAssert) { + assert(pLogStore->syncLogBeginIndex(pLogStore) == 5); + assert(pLogStore->syncLogEndIndex(pLogStore) == -1); + assert(pLogStore->syncLogEntryCount(pLogStore) == 0); + assert(pLogStore->syncLogWriteIndex(pLogStore) == 5); + assert(pLogStore->syncLogIsEmpty(pLogStore) == 0); + assert(pLogStore->syncLogLastIndex(pLogStore) == -1); + assert(pLogStore->syncLogLastTerm(pLogStore) == 0); + } + logStoreDestory(pLogStore); cleanup(); } @@ -259,6 +421,11 @@ int main(int argc, char** argv) { tsAsyncLog = 0; sDebugFlag = DEBUG_TRACE + DEBUG_INFO + DEBUG_SCREEN + DEBUG_FILE; + if (argc == 2) { + gAssert = atoi(argv[1]); + } + sTrace("gAssert : %d", gAssert); + test1(); test2(); test3(); diff --git a/source/libs/sync/test/syncTimeoutTest.cpp b/source/libs/sync/test/syncTimeoutTest.cpp index 6125b24c1a446940f7121a8a94d9332d70fbf616..e60fabe38bd007d3f7d0b62bbe275cfacc1cc10b 100644 --- a/source/libs/sync/test/syncTimeoutTest.cpp +++ b/source/libs/sync/test/syncTimeoutTest.cpp @@ -80,7 +80,7 @@ void test5() { void test6() { SyncTimeout *pMsg = createMsg(); - char * jsonStr = syncTimeout2Str(pMsg); + char * jsonStr = syncTimeout2Str(pMsg); sTrace("jsonStr: %s", jsonStr); syncUtilJson2Line(jsonStr);