diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 4fedcdb3b13f88b3355be4569c3c4581ad405c6d..352649f16ffa39a707b3e04fb28f4e42bdd9f02a 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -212,13 +212,17 @@ void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId); void syncNodeVoteForSelf(SSyncNode* pSyncNode); // snapshot -------------- -bool syncNodeHasSnapshot(SSyncNode* pSyncNode); -bool syncNodeIsIndexInSnapshot(SSyncNode* pSyncNode, SyncIndex index); +bool syncNodeHasSnapshot(SSyncNode* pSyncNode); +bool syncNodeIsIndexInSnapshot(SSyncNode* pSyncNode, SyncIndex index); + SyncIndex syncNodeGetLastIndex(SSyncNode* pSyncNode); SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode); int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm); + +SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode); + SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index); -SyncTerm syncNodeGetPreITerm(SSyncNode* pSyncNode, SyncIndex index); +SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index); int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm); // for debug -------------- diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 05e7bd34eb21f729fdb3d07cceabdb12f184ba3c..04db39ec09a714aae59a8e62db6b07bf21ad8f2d 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1285,9 +1285,14 @@ bool syncNodeHasSnapshot(SSyncNode* pSyncNode) { } bool syncNodeIsIndexInSnapshot(SSyncNode* pSyncNode, SyncIndex index) { + ASSERT(syncNodeHasSnapshot(pSyncNode)); + ASSERT(pSyncNode->pFsm->FpGetSnapshot != NULL); + ASSERT(index >= SYNC_INDEX_BEGIN); + SSnapshot snapshot; pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); bool b = (index <= snapshot.lastApplyIndex); + return b; } @@ -1303,18 +1308,33 @@ SyncIndex syncNodeGetLastIndex(SSyncNode* pSyncNode) { } SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) { - SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; - if (pSyncNode->pFsm->FpGetSnapshot != NULL) { - pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); - } - SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); - SyncTerm lastTerm = 0; - if (logLastIndex >= snapshot.lastApplyIndex) { - lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore); + if (syncNodeHasSnapshot(pSyncNode)) { + // has snapshot + SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; + if (pSyncNode->pFsm->FpGetSnapshot != NULL) { + pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); + } + + if (pSyncNode->pLogStore->syncLogEntryCount(pSyncNode->pLogStore) > 0) { + // has log + SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); + if (logLastIndex > snapshot.lastApplyIndex) { + lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore); + } else { + lastTerm = snapshot.lastApplyTerm; + } + + } else { + // no log + lastTerm = snapshot.lastApplyTerm; + } + } else { - lastTerm = snapshot.lastApplyTerm; + // no snapshot + lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore); } + return lastTerm; } @@ -1325,39 +1345,89 @@ int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, Sy return 0; } +SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) { + SyncIndex syncStartIndex = syncNodeGetLastIndex(pSyncNode) + 1; + return syncStartIndex; +} + SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) { ASSERT(index >= SYNC_INDEX_BEGIN); - SyncIndex preIndex = index - 1; + SyncIndex syncStartIndex = syncNodeSyncStartIndex(pSyncNode); + ASSERT(index <= syncStartIndex); + + SyncIndex preIndex; + if (syncNodeHasSnapshot(pSyncNode)) { + // has snapshot + SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; + if (pSyncNode->pFsm->FpGetSnapshot != NULL) { + pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); + } + + ASSERT(index > snapshot.lastApplyIndex); + preIndex = index - 1; + + } else { + // no snapshot + preIndex = index - 1; + } + return preIndex; } -SyncTerm syncNodeGetPreITerm(SSyncNode* pSyncNode, SyncIndex index) { - SyncTerm preTerm = 0; - SyncIndex preIndex = syncNodeGetPreIndex(pSyncNode, index); +SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) { + ASSERT(index >= SYNC_INDEX_BEGIN); + SyncIndex syncStartIndex = syncNodeSyncStartIndex(pSyncNode); + ASSERT(index <= syncStartIndex); - SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; - if (pSyncNode->pFsm->FpGetSnapshot != NULL) { - pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); + if (index == SYNC_INDEX_BEGIN) { + return 0; } - if (syncNodeIsIndexInSnapshot(pSyncNode, preIndex) && preIndex == snapshot.lastApplyIndex) { - preTerm = snapshot.lastApplyTerm; - } else { - SSyncRaftEntry* pPreEntry = NULL; - int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry); - ASSERT(code == 0); - if (pPreEntry != NULL) { + SyncTerm preTerm = 0; + if (syncNodeHasSnapshot(pSyncNode)) { + // has snapshot + SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; + if (pSyncNode->pFsm->FpGetSnapshot != NULL) { + pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); + } + + ASSERT(index > snapshot.lastApplyIndex); + if (index > snapshot.lastApplyIndex + 1) { + // should be log preTerm + SSyncRaftEntry* pPreEntry = NULL; + int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index - 1, &pPreEntry); + ASSERT(code == 0); + ASSERT(pPreEntry != NULL); + preTerm = pPreEntry->term; taosMemoryFree(pPreEntry); + + } else if (index == snapshot.lastApplyIndex + 1) { + preTerm = snapshot.lastApplyTerm; + } else { + ASSERT(0); } + + } else { + // no snapshot + ASSERT(index > SYNC_INDEX_BEGIN); + + SSyncRaftEntry* pPreEntry = NULL; + int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index - 1, &pPreEntry); + ASSERT(code == 0); + ASSERT(pPreEntry != NULL); + + preTerm = pPreEntry->term; + taosMemoryFree(pPreEntry); } + return preTerm; } // get pre index and term of "index" int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) { *pPreIndex = syncNodeGetPreIndex(pSyncNode, index); - *pPreTerm = syncNodeGetPreITerm(pSyncNode, index); + *pPreTerm = syncNodeGetPreTerm(pSyncNode, index); return 0; } diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 55900c1e89cb729909e0c0793f3ad30189858edf..b021c3bf590b6df8374bf47a8f1dc5f49f4eb255 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -437,11 +437,11 @@ cJSON* logStore2Json(SSyncLogStore* pLogStore) { snprintf(u64buf, sizeof(u64buf), "%ld", pData->beginIndex); cJSON_AddStringToObject(pRoot, "beginIndex", u64buf); - + SyncIndex endIndex = raftLogEndIndex(pLogStore); snprintf(u64buf, sizeof(u64buf), "%ld", endIndex); cJSON_AddStringToObject(pRoot, "endIndex", u64buf); - + int32_t count = raftLogEntryCount(pLogStore); cJSON_AddNumberToObject(pRoot, "entryCount", count); diff --git a/source/libs/sync/test/syncRaftLogTest2.cpp b/source/libs/sync/test/syncRaftLogTest2.cpp index 911948363bf07e07b2aa26a7657603c98289c60f..a23384df968bf3b1f424fbb942ff9f035cc89d4a 100644 --- a/source/libs/sync/test/syncRaftLogTest2.cpp +++ b/source/libs/sync/test/syncRaftLogTest2.cpp @@ -34,7 +34,7 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) { void init() { walInit(); - + SWalCfg walCfg; memset(&walCfg, 0, sizeof(SWalCfg)); walCfg.vgId = 1000; diff --git a/source/libs/sync/test/syncRaftLogTest3.cpp b/source/libs/sync/test/syncRaftLogTest3.cpp index 27e1009335a82e42e0d8e8e1b00f8cfe709e9534..d5f9d91a510a22dea565902f5c4aec21a060f52e 100644 --- a/source/libs/sync/test/syncRaftLogTest3.cpp +++ b/source/libs/sync/test/syncRaftLogTest3.cpp @@ -22,9 +22,18 @@ SWal* pWal; SSyncLogStore* pLogStore; const char* pWalPath = "./syncLogStoreTest_wal"; +SyncIndex gSnapshotLastApplyIndex; +SyncIndex gSnapshotLastApplyTerm; + +int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) { + pSnapshot->data = NULL; + pSnapshot->lastApplyIndex = gSnapshotLastApplyIndex; + pSnapshot->lastApplyTerm = gSnapshotLastApplyTerm; + return 0; +} + void init() { walInit(); - taosRemoveDir(pWalPath); SWalCfg walCfg; memset(&walCfg, 0, sizeof(SWalCfg)); @@ -41,6 +50,9 @@ void init() { pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(SSyncNode)); memset(pSyncNode, 0, sizeof(SSyncNode)); pSyncNode->pWal = pWal; + + pSyncNode->pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM)); + pSyncNode->pFsm->FpGetSnapshot = GetSnapshotCb; } void cleanup() { @@ -49,14 +61,56 @@ void cleanup() { taosMemoryFree(pSyncNode); } -void logStoreTest() { +void test1() { + // no snapshot + // no log + + taosRemoveDir(pWalPath); + + init(); pLogStore = logStoreCreate(pSyncNode); assert(pLogStore); - assert(pLogStore->getLastIndex(pLogStore) == SYNC_INDEX_INVALID); + pSyncNode->pLogStore = pLogStore; + logStoreLog2((char*)"\n\n\ntest1 ----- ", pLogStore); + + gSnapshotLastApplyIndex = -1; + gSnapshotLastApplyTerm = 0; + + bool hasSnapshot = syncNodeHasSnapshot(pSyncNode); + SSnapshot snapshot; + pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); + + SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode); + SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode); + + SyncIndex testIndex = 0; + SyncIndex preIndex = syncNodeGetPreIndex(pSyncNode, testIndex); + SyncTerm preTerm = syncNodeGetPreTerm(pSyncNode, testIndex); - logStoreLog2((char*)"logStoreTest", pLogStore); + sTrace("test1"); + sTrace("hasSnapshot:%d, lastApplyIndex:%ld, lastApplyTerm:%lu", hasSnapshot, snapshot.lastApplyIndex, snapshot.lastApplyTerm); + sTrace("lastIndex: %ld", lastIndex); + sTrace("lastTerm: %lu", lastTerm); + sTrace("%ld's preIndex: %ld", testIndex, preIndex); + sTrace("%ld's preTerm: %lu", testIndex, preTerm); - for (int i = 0; i < 5; ++i) { + logStoreDestory(pLogStore); + cleanup(); +} + +void test2() { + // no snapshot + // whole log + + taosRemoveDir(pWalPath); + + init(); + pLogStore = logStoreCreate(pSyncNode); + assert(pLogStore); + pSyncNode->pLogStore = pLogStore; + logStoreLog2((char*)"\n\n\ntest2 ----- ", pLogStore); + + for (int i = 0; i <= 10; ++i) { int32_t dataLen = 10; SSyncRaftEntry* pEntry = syncEntryBuild(dataLen); assert(pEntry != NULL); @@ -65,34 +119,200 @@ void logStoreTest() { pEntry->seqNum = 3; pEntry->isWeak = true; pEntry->term = 100 + i; - pEntry->index = pLogStore->getLastIndex(pLogStore) + 1; + pEntry->index = pLogStore->syncLogWriteIndex(pLogStore); snprintf(pEntry->data, dataLen, "value%d", i); - syncEntryLog2((char*)"==write entry== :", pEntry); - pLogStore->appendEntry(pLogStore, pEntry); + pLogStore->syncLogAppendEntry(pLogStore, pEntry); syncEntryDestory(pEntry); + } + logStoreLog2((char*)"test2 after appendEntry", pLogStore); + + gSnapshotLastApplyIndex = -1; + gSnapshotLastApplyTerm = 0; + + bool hasSnapshot = syncNodeHasSnapshot(pSyncNode); + SSnapshot snapshot; + pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); - if (i == 0) { - assert(pLogStore->getLastIndex(pLogStore) == SYNC_INDEX_BEGIN); - } + SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode); + SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode); + + sTrace("test2"); + sTrace("hasSnapshot:%d, lastApplyIndex:%ld, lastApplyTerm:%lu", hasSnapshot, snapshot.lastApplyIndex, snapshot.lastApplyTerm); + sTrace("lastIndex: %ld", lastIndex); + sTrace("lastTerm: %lu", lastTerm); + + for (SyncIndex i = 11; i >= 0; --i) { + SyncIndex preIndex = syncNodeGetPreIndex(pSyncNode, i); + SyncTerm preTerm = syncNodeGetPreTerm(pSyncNode, i); + + sTrace("%ld's preIndex: %ld", i, preIndex); + sTrace("%ld's preTerm: %lu", i, preTerm); } - logStoreLog2((char*)"after appendEntry", pLogStore); - pLogStore->truncate(pLogStore, 3); - logStoreLog2((char*)"after truncate 3", pLogStore); + logStoreDestory(pLogStore); + cleanup(); + +} + +void test3() { + // has snapshot + // no log + + taosRemoveDir(pWalPath); + + init(); + pLogStore = logStoreCreate(pSyncNode); + assert(pLogStore); + pSyncNode->pLogStore = pLogStore; + logStoreLog2((char*)"\n\n\ntest3 ----- ", pLogStore); + + gSnapshotLastApplyIndex = 5; + gSnapshotLastApplyTerm = 100; + + bool hasSnapshot = syncNodeHasSnapshot(pSyncNode); + SSnapshot snapshot; + pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); + + SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode); + SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode); + + SyncIndex preIndex = syncNodeGetPreIndex(pSyncNode, 6); + SyncTerm preTerm = syncNodeGetPreTerm(pSyncNode, 6); + + sTrace("test3"); + sTrace("hasSnapshot:%d, lastApplyIndex:%ld, lastApplyTerm:%lu", hasSnapshot, snapshot.lastApplyIndex, snapshot.lastApplyTerm); + sTrace("lastIndex: %ld", lastIndex); + sTrace("lastTerm: %lu", lastTerm); + sTrace("%d's preIndex: %ld", 6, preIndex); + sTrace("%d's preTerm: %lu", 6, preTerm); logStoreDestory(pLogStore); + cleanup(); } -int main(int argc, char** argv) { - tsAsyncLog = 0; - sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE; +void test4() { + // has snapshot + // whole log + + taosRemoveDir(pWalPath); init(); - logStoreTest(); + pLogStore = logStoreCreate(pSyncNode); + assert(pLogStore); + pSyncNode->pLogStore = pLogStore; + logStoreLog2((char*)"\n\n\ntest4 ----- ", pLogStore); - taosMsleep(2000); + for (int i = 0; i <= 10; ++i) { + int32_t dataLen = 10; + SSyncRaftEntry* pEntry = syncEntryBuild(dataLen); + assert(pEntry != NULL); + pEntry->msgType = 1; + pEntry->originalRpcType = 2; + pEntry->seqNum = 3; + pEntry->isWeak = true; + pEntry->term = 100 + i; + pEntry->index = pLogStore->syncLogWriteIndex(pLogStore); + snprintf(pEntry->data, dataLen, "value%d", i); + + pLogStore->syncLogAppendEntry(pLogStore, pEntry); + syncEntryDestory(pEntry); + } + logStoreLog2((char*)"test4 after appendEntry", pLogStore); + + gSnapshotLastApplyIndex = 5; + gSnapshotLastApplyTerm = 100; + + bool hasSnapshot = syncNodeHasSnapshot(pSyncNode); + SSnapshot snapshot; + pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); + + SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode); + SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode); + + sTrace("test4"); + sTrace("hasSnapshot:%d, lastApplyIndex:%ld, lastApplyTerm:%lu", hasSnapshot, snapshot.lastApplyIndex, snapshot.lastApplyTerm); + sTrace("lastIndex: %ld", lastIndex); + sTrace("lastTerm: %lu", lastTerm); + + for (SyncIndex i = 11; i >= 6; --i) { + SyncIndex preIndex = syncNodeGetPreIndex(pSyncNode, i); + SyncTerm preTerm = syncNodeGetPreTerm(pSyncNode, i); + + sTrace("%ld's preIndex: %ld", i, preIndex); + sTrace("%ld's preTerm: %lu", i, preTerm); + } + + logStoreDestory(pLogStore); cleanup(); +} + +void test5() { + // has snapshot + // partial log + + taosRemoveDir(pWalPath); + + init(); + pLogStore = logStoreCreate(pSyncNode); + assert(pLogStore); + pSyncNode->pLogStore = pLogStore; + logStoreLog2((char*)"\n\n\ntest5 ----- ", pLogStore); + + pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, 6); + for (int i = 6; i <= 10; ++i) { + int32_t dataLen = 10; + SSyncRaftEntry* pEntry = syncEntryBuild(dataLen); + assert(pEntry != NULL); + pEntry->msgType = 1; + pEntry->originalRpcType = 2; + pEntry->seqNum = 3; + pEntry->isWeak = true; + pEntry->term = 100 + i; + pEntry->index = pLogStore->syncLogWriteIndex(pLogStore); + snprintf(pEntry->data, dataLen, "value%d", i); + + pLogStore->syncLogAppendEntry(pLogStore, pEntry); + syncEntryDestory(pEntry); + } + logStoreLog2((char*)"test5 after appendEntry", pLogStore); + + gSnapshotLastApplyIndex = 5; + gSnapshotLastApplyTerm = 100; + + bool hasSnapshot = syncNodeHasSnapshot(pSyncNode); + SSnapshot snapshot; + pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); + + SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode); + SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode); + + sTrace("test5"); + sTrace("hasSnapshot:%d, lastApplyIndex:%ld, lastApplyTerm:%lu", hasSnapshot, snapshot.lastApplyIndex, snapshot.lastApplyTerm); + sTrace("lastIndex: %ld", lastIndex); + sTrace("lastTerm: %lu", lastTerm); + + for (SyncIndex i = 11; i >= 6; --i) { + SyncIndex preIndex = syncNodeGetPreIndex(pSyncNode, i); + SyncTerm preTerm = syncNodeGetPreTerm(pSyncNode, i); + + sTrace("%ld's preIndex: %ld", i, preIndex); + sTrace("%ld's preTerm: %lu", i, preTerm); + } + + logStoreDestory(pLogStore); + cleanup(); +} + +int main(int argc, char** argv) { + tsAsyncLog = 0; + sDebugFlag = DEBUG_TRACE + DEBUG_INFO + DEBUG_SCREEN + DEBUG_FILE; + + test1(); + test2(); + test3(); + test4(); + test5(); return 0; }