From 2adc0b8a76116d2a29a83648fc7a3e86cec503f2 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Sun, 5 Jun 2022 21:48:50 +0800 Subject: [PATCH] enh(sync): add log index manager --- include/libs/sync/sync.h | 1 + source/libs/sync/src/syncRaftLog.c | 71 ++++++--- source/libs/sync/test/syncRaftLogTest2.cpp | 163 ++++++++++++++++++--- 3 files changed, 192 insertions(+), 43 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index c8213160d2..9a6d42e5bd 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -149,6 +149,7 @@ typedef struct SSyncLogStore { int32_t (*syncLogEntryCount)(struct SSyncLogStore* pLogStore); bool (*syncLogInRange)(struct SSyncLogStore* pLogStore, SyncIndex index); + SyncIndex (*syncLogWriteIndex)(struct SSyncLogStore* pLogStore); SyncIndex (*syncLogLastIndex)(struct SSyncLogStore* pLogStore); SyncTerm (*syncLogLastTerm)(struct SSyncLogStore* pLogStore); diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 0178755651..55900c1e89 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -20,6 +20,7 @@ static int32_t raftLogSetBeginIndex(struct SSyncLogStore* pLogStore, SyncIndex beginIndex); static SyncIndex raftLogBeginIndex(struct SSyncLogStore* pLogStore); static SyncIndex raftLogEndIndex(struct SSyncLogStore* pLogStore); +static SyncIndex raftLogWriteIndex(struct SSyncLogStore* pLogStore); static bool raftLogIsEmpty(struct SSyncLogStore* pLogStore); static int32_t raftLogEntryCount(struct SSyncLogStore* pLogStore); static bool raftLogInRange(struct SSyncLogStore* pLogStore, SyncIndex index); @@ -70,7 +71,7 @@ static bool raftLogIsEmpty(struct SSyncLogStore* pLogStore) { static int32_t raftLogEntryCount(struct SSyncLogStore* pLogStore) { SyncIndex beginIndex = raftLogBeginIndex(pLogStore); SyncIndex endIndex = raftLogEndIndex(pLogStore); - int32_t count = endIndex - beginIndex; + int32_t count = endIndex - beginIndex + 1; return count > 0 ? count : 0; } @@ -85,22 +86,48 @@ static bool raftLogInRange(struct SSyncLogStore* pLogStore, SyncIndex index) { } static SyncIndex raftLogLastIndex(struct SSyncLogStore* pLogStore) { + SyncIndex lastIndex; SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; SyncIndex lastVer = walGetLastVer(pWal); - return lastVer; + SyncIndex firstVer = walGetFirstVer(pWal); + + if (lastVer < firstVer) { + // no record + lastIndex = -1; + + } else { + if (firstVer >= 0) { + lastIndex = lastVer; + } else if (firstVer == -1) { + lastIndex = -1; + } else { + ASSERT(0); + } + } + + return lastIndex; +} + +static SyncIndex raftLogWriteIndex(struct SSyncLogStore* pLogStore) { + SSyncLogStoreData* pData = pLogStore->data; + SWal* pWal = pData->pWal; + SyncIndex lastVer = walGetLastVer(pWal); + return lastVer + 1; } static SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore) { SyncTerm lastTerm = 0; - if (raftLogEntryCount == 0) { + if (raftLogEntryCount(pLogStore) == 0) { lastTerm = 0; } else { SSyncRaftEntry* pLastEntry; int32_t code = raftLogGetLastEntry(pLogStore, &pLastEntry); ASSERT(code == 0); - lastTerm = pLastEntry->term; - taosMemoryFree(pLastEntry); + if (pLastEntry != NULL) { + lastTerm = pLastEntry->term; + taosMemoryFree(pLastEntry); + } } return lastTerm; } @@ -109,8 +136,8 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; - SyncIndex lastIndex = raftLogLastIndex(pLogStore); - ASSERT(pEntry->index == lastIndex + 1); + SyncIndex writeIndex = raftLogWriteIndex(pLogStore); + ASSERT(pEntry->index == writeIndex); int code = 0; SSyncLogMeta syncMeta; @@ -194,8 +221,9 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn } static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** ppLastEntry) { + *ppLastEntry = NULL; if (raftLogEntryCount(pLogStore) == 0) { - return -1; + return 0; } SyncIndex lastIndex = raftLogLastIndex(pLogStore); int32_t code = raftLogGetEntry(pLogStore, lastIndex, ppLastEntry); @@ -243,6 +271,7 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) { pLogStore->syncLogAppendEntry = raftLogAppendEntry; pLogStore->syncLogGetEntry = raftLogGetEntry; pLogStore->syncLogTruncate = raftLogTruncate; + pLogStore->syncLogWriteIndex = raftLogWriteIndex; return pLogStore; } @@ -401,35 +430,29 @@ cJSON* logStore2Json(SSyncLogStore* pLogStore) { cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf); snprintf(u64buf, sizeof(u64buf), "%p", pData->pWal); cJSON_AddStringToObject(pRoot, "pWal", u64buf); - snprintf(u64buf, sizeof(u64buf), "%ld", logStoreLastIndex(pLogStore)); + snprintf(u64buf, sizeof(u64buf), "%ld", raftLogLastIndex(pLogStore)); cJSON_AddStringToObject(pRoot, "LastIndex", u64buf); - snprintf(u64buf, sizeof(u64buf), "%lu", logStoreLastTerm(pLogStore)); + snprintf(u64buf, sizeof(u64buf), "%lu", raftLogLastTerm(pLogStore)); cJSON_AddStringToObject(pRoot, "LastTerm", u64buf); snprintf(u64buf, sizeof(u64buf), "%ld", pData->beginIndex); cJSON_AddStringToObject(pRoot, "beginIndex", u64buf); + + SyncIndex endIndex = raftLogEndIndex(pLogStore); + snprintf(u64buf, sizeof(u64buf), "%ld", endIndex); + cJSON_AddStringToObject(pRoot, "endIndex", u64buf); + + int32_t count = raftLogEntryCount(pLogStore); + cJSON_AddNumberToObject(pRoot, "entryCount", count); cJSON* pEntries = cJSON_CreateArray(); cJSON_AddItemToObject(pRoot, "pEntries", pEntries); - SyncIndex lastIndex = logStoreLastIndex(pLogStore); - for (SyncIndex i = pData->beginIndex; i <= lastIndex; ++i) { + for (SyncIndex i = pData->beginIndex; i <= endIndex; ++i) { SSyncRaftEntry* pEntry = logStoreGetEntry(pLogStore, i); cJSON_AddItemToArray(pEntries, syncEntry2Json(pEntry)); syncEntryDestory(pEntry); } - - /* - for (SyncIndex i = 0; i <= lastIndex; ++i) { - SyncIndex walFirstVer = walGetFirstVer(pData->pWal); - - if (i != SYNC_INDEX_INVALID && i >= walFirstVer) { - SSyncRaftEntry* pEntry = logStoreGetEntry(pLogStore, i); - cJSON_AddItemToArray(pEntries, syncEntry2Json(pEntry)); - syncEntryDestory(pEntry); - } - } - */ } cJSON* pJson = cJSON_CreateObject(); diff --git a/source/libs/sync/test/syncRaftLogTest2.cpp b/source/libs/sync/test/syncRaftLogTest2.cpp index 27e1009335..aaa59189f9 100644 --- a/source/libs/sync/test/syncRaftLogTest2.cpp +++ b/source/libs/sync/test/syncRaftLogTest2.cpp @@ -22,6 +22,16 @@ SWal* pWal; SSyncLogStore* pLogStore; const char* pWalPath = "./syncLogStoreTest_wal"; +SyncIndex gSnapshotLastApplyIndex; +SyncIndex gSnapshotLastApplyTerm; + +int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) { + pSnapshot->data = NULL; + pSnapshot->lastApplyIndex = gSnapshotLastApplyIndex; + pSnapshot->lastApplyTerm = gSnapshotLastApplyTerm; + return 0; +} + void init() { walInit(); taosRemoveDir(pWalPath); @@ -41,6 +51,9 @@ void init() { pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(SSyncNode)); memset(pSyncNode, 0, sizeof(SSyncNode)); pSyncNode->pWal = pWal; + + pSyncNode->pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM)); + pSyncNode->pFsm->FpGetSnapshot = GetSnapshotCb; } void cleanup() { @@ -49,14 +62,37 @@ void cleanup() { taosMemoryFree(pSyncNode); } -void logStoreTest() { +void test1() { + init(); + + pLogStore = logStoreCreate(pSyncNode); + assert(pLogStore); + logStoreLog2((char*)"\n\n\ntest1 ----- ", pLogStore); + logStoreDestory(pLogStore); + + cleanup(); +} + +void test2() { + init(); + pLogStore = logStoreCreate(pSyncNode); assert(pLogStore); - assert(pLogStore->getLastIndex(pLogStore) == SYNC_INDEX_INVALID); + pLogStore->syncLogSetBeginIndex(pLogStore, 5); + logStoreLog2((char*)"\n\n\ntest2 ----- ", pLogStore); + logStoreDestory(pLogStore); + + cleanup(); +} + +void test3() { + init(); - logStoreLog2((char*)"logStoreTest", pLogStore); + pLogStore = logStoreCreate(pSyncNode); + assert(pLogStore); + logStoreLog2((char*)"\n\n\ntest3 ----- ", pLogStore); - for (int i = 0; i < 5; ++i) { + for (int i = 0; i <= 4; ++i) { int32_t dataLen = 10; SSyncRaftEntry* pEntry = syncEntryBuild(dataLen); assert(pEntry != NULL); @@ -65,34 +101,123 @@ void logStoreTest() { pEntry->seqNum = 3; pEntry->isWeak = true; pEntry->term = 100 + i; - pEntry->index = pLogStore->getLastIndex(pLogStore) + 1; + pEntry->index = pLogStore->syncLogWriteIndex(pLogStore); snprintf(pEntry->data, dataLen, "value%d", i); - syncEntryLog2((char*)"==write entry== :", pEntry); - pLogStore->appendEntry(pLogStore, pEntry); + pLogStore->syncLogAppendEntry(pLogStore, pEntry); syncEntryDestory(pEntry); - - if (i == 0) { - assert(pLogStore->getLastIndex(pLogStore) == SYNC_INDEX_BEGIN); - } } - logStoreLog2((char*)"after appendEntry", pLogStore); + logStoreLog2((char*)"test3 after appendEntry", pLogStore); + logStoreDestory(pLogStore); + + cleanup(); +} + +void test4() { + init(); + + pLogStore = logStoreCreate(pSyncNode); + assert(pLogStore); + logStoreLog2((char*)"\n\n\ntest4 ----- ", pLogStore); + pLogStore->syncLogSetBeginIndex(pLogStore, 5); - pLogStore->truncate(pLogStore, 3); - logStoreLog2((char*)"after truncate 3", pLogStore); + for (int i = 5; i <= 9; ++i) { + int32_t dataLen = 10; + SSyncRaftEntry* pEntry = syncEntryBuild(dataLen); + assert(pEntry != NULL); + pEntry->msgType = 1; + pEntry->originalRpcType = 2; + pEntry->seqNum = 3; + pEntry->isWeak = true; + pEntry->term = 100 + i; + pEntry->index = pLogStore->syncLogWriteIndex(pLogStore); + snprintf(pEntry->data, dataLen, "value%d", i); + pLogStore->syncLogAppendEntry(pLogStore, pEntry); + syncEntryDestory(pEntry); + } + logStoreLog2((char*)"test4 after appendEntry", pLogStore); logStoreDestory(pLogStore); + + cleanup(); } -int main(int argc, char** argv) { - tsAsyncLog = 0; - sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE; +void test5() { + init(); + + pLogStore = logStoreCreate(pSyncNode); + assert(pLogStore); + logStoreLog2((char*)"\n\n\ntest5 ----- ", pLogStore); + pLogStore->syncLogSetBeginIndex(pLogStore, 5); + + for (int i = 5; i <= 9; ++i) { + int32_t dataLen = 10; + SSyncRaftEntry* pEntry = syncEntryBuild(dataLen); + assert(pEntry != NULL); + pEntry->msgType = 1; + pEntry->originalRpcType = 2; + pEntry->seqNum = 3; + pEntry->isWeak = true; + pEntry->term = 100 + i; + pEntry->index = pLogStore->syncLogWriteIndex(pLogStore); + snprintf(pEntry->data, dataLen, "value%d", i); + + pLogStore->syncLogAppendEntry(pLogStore, pEntry); + syncEntryDestory(pEntry); + } + logStoreLog2((char*)"test5 after appendEntry", pLogStore); + + pLogStore->syncLogTruncate(pLogStore, 7); + logStoreLog2((char*)"after truncate 7", pLogStore); + + logStoreDestory(pLogStore); + + cleanup(); +} +void test6() { init(); - logStoreTest(); - taosMsleep(2000); + pLogStore = logStoreCreate(pSyncNode); + assert(pLogStore); + logStoreLog2((char*)"\n\n\ntest6 ----- ", pLogStore); + pLogStore->syncLogSetBeginIndex(pLogStore, 5); + + for (int i = 5; i <= 9; ++i) { + int32_t dataLen = 10; + SSyncRaftEntry* pEntry = syncEntryBuild(dataLen); + assert(pEntry != NULL); + pEntry->msgType = 1; + pEntry->originalRpcType = 2; + pEntry->seqNum = 3; + pEntry->isWeak = true; + pEntry->term = 100 + i; + pEntry->index = pLogStore->syncLogWriteIndex(pLogStore); + snprintf(pEntry->data, dataLen, "value%d", i); + + pLogStore->syncLogAppendEntry(pLogStore, pEntry); + syncEntryDestory(pEntry); + } + logStoreLog2((char*)"test6 after appendEntry", pLogStore); + + pLogStore->syncLogTruncate(pLogStore, 5); + logStoreLog2((char*)"after truncate 5", pLogStore); + + logStoreDestory(pLogStore); + cleanup(); +} + +int main(int argc, char** argv) { + tsAsyncLog = 0; + sDebugFlag = DEBUG_TRACE + DEBUG_INFO + DEBUG_SCREEN + DEBUG_FILE; + + test1(); + test2(); + test3(); + test4(); + test5(); + test6(); return 0; } -- GitLab