From 363c98e9edfe65b0b85db24bec0cb259d2c0a6a9 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 21 Jun 2022 17:45:08 +0800 Subject: [PATCH] refactor(sync): abstract for log store --- include/libs/sync/sync.h | 2 +- source/libs/sync/src/syncMain.c | 130 ++++++++++++++----- source/libs/sync/src/syncRaftLog.c | 53 +++++++- source/libs/sync/src/syncReplication.c | 17 ++- source/libs/sync/test/CMakeLists.txt | 14 ++ source/libs/sync/test/syncLogStoreCheck2.cpp | 76 +++++++++++ source/libs/sync/test/syncRaftLogTest2.cpp | 12 ++ source/libs/wal/src/walRead.c | 6 + 8 files changed, 264 insertions(+), 46 deletions(-) create mode 100644 source/libs/sync/test/syncLogStoreCheck2.cpp diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index e963f25616..2cb0c65d35 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -162,7 +162,7 @@ typedef struct SSyncLogStore { SyncIndex (*syncLogEndIndex)(struct SSyncLogStore* pLogStore); bool (*syncLogIsEmpty)(struct SSyncLogStore* pLogStore); int32_t (*syncLogEntryCount)(struct SSyncLogStore* pLogStore); - bool (*syncLogInRange)(struct SSyncLogStore* pLogStore, SyncIndex index); + // bool (*syncLogInRange)(struct SSyncLogStore* pLogStore, SyncIndex index); SyncIndex (*syncLogWriteIndex)(struct SSyncLogStore* pLogStore); SyncIndex (*syncLogLastIndex)(struct SSyncLogStore* pLogStore); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index e573c0a1af..3a0e6edeee 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1311,27 +1311,35 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { if (userStrLen < 256) { char logBuf[128 + 256]; - snprintf(logBuf, sizeof(logBuf), - "vgId:%d, sync %s %s, term:%lu, commit:%ld, beginlog:%ld, lastlog:%ld, lastsnapshot:%ld, standby:%d, " - "replica-num:%d, " - "lconfig:%ld, changing:%d", - pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, - pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, - pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, - pSyncNode->changing); + if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) { + snprintf(logBuf, sizeof(logBuf), + "vgId:%d, sync %s %s, term:%lu, commit:%ld, beginlog:%ld, lastlog:%ld, lastsnapshot:%ld, standby:%d, " + "replica-num:%d, " + "lconfig:%ld, changing:%d", + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, + pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, + pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, + pSyncNode->changing); + } else { + snprintf(logBuf, sizeof(logBuf), "%s", str); + } sDebug("%s", logBuf); } else { int len = 128 + userStrLen; char* s = (char*)taosMemoryMalloc(len); - snprintf(s, len, - "vgId:%d, sync %s %s, term:%lu, commit:%ld, beginlog:%ld, lastlog:%ld, lastsnapshot:%ld, standby:%d, " - "replica-num:%d, " - "lconfig:%ld, changing:%d", - pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, - pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, - pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, - pSyncNode->changing); + if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) { + snprintf(s, len, + "vgId:%d, sync %s %s, term:%lu, commit:%ld, beginlog:%ld, lastlog:%ld, lastsnapshot:%ld, standby:%d, " + "replica-num:%d, " + "lconfig:%ld, changing:%d", + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, + pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, + pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, + pSyncNode->changing); + } else { + snprintf(s, len, "%s", str); + } sDebug("%s", s); taosMemoryFree(s); } @@ -1349,27 +1357,35 @@ void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) { if (userStrLen < 256) { char logBuf[128 + 256]; - snprintf(logBuf, sizeof(logBuf), - "vgId:%d, sync %s %s, term:%lu, commit:%ld, beginlog:%ld, lastlog:%ld, lastsnapshot:%ld, standby:%d, " - "replica-num:%d, " - "lconfig:%ld, changing:%d", - pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, - pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, - pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, - pSyncNode->changing); + if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) { + snprintf(logBuf, sizeof(logBuf), + "vgId:%d, sync %s %s, term:%lu, commit:%ld, beginlog:%ld, lastlog:%ld, lastsnapshot:%ld, standby:%d, " + "replica-num:%d, " + "lconfig:%ld, changing:%d", + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, + pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, + pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, + pSyncNode->changing); + } else { + snprintf(logBuf, sizeof(logBuf), "%s", str); + } sError("%s", logBuf); } else { int len = 128 + userStrLen; char* s = (char*)taosMemoryMalloc(len); - snprintf(s, len, - "vgId:%d, sync %s %s, term:%lu, commit:%ld, beginlog:%ld, lastlog:%ld, lastsnapshot:%ld, standby:%d, " - "replica-num:%d, " - "lconfig:%ld, changing:%d", - pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, - pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, - pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, - pSyncNode->changing); + if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) { + snprintf(s, len, + "vgId:%d, sync %s %s, term:%lu, commit:%ld, beginlog:%ld, lastlog:%ld, lastsnapshot:%ld, standby:%d, " + "replica-num:%d, " + "lconfig:%ld, changing:%d", + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, + pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, + pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, + pSyncNode->changing); + } else { + snprintf(s, len, "%s", str); + } sError("%s", s); taosMemoryFree(s); } @@ -1802,7 +1818,7 @@ void syncNodeVoteForSelf(SSyncNode* pSyncNode) { // snapshot -------------- bool syncNodeHasSnapshot(SSyncNode* pSyncNode) { bool ret = false; - SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; + SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1}; if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) { @@ -1812,6 +1828,7 @@ bool syncNodeHasSnapshot(SSyncNode* pSyncNode) { return ret; } +#if 0 bool syncNodeIsIndexInSnapshot(SSyncNode* pSyncNode, SyncIndex index) { ASSERT(syncNodeHasSnapshot(pSyncNode)); ASSERT(pSyncNode->pFsm->FpGetSnapshotInfo != NULL); @@ -1822,6 +1839,7 @@ bool syncNodeIsIndexInSnapshot(SSyncNode* pSyncNode, SyncIndex index) { bool b = (index <= snapshot.lastApplyIndex); return b; } +#endif SyncIndex syncNodeGetLastIndex(SSyncNode* pSyncNode) { SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; @@ -1838,7 +1856,7 @@ SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) { SyncTerm lastTerm = 0; if (syncNodeHasSnapshot(pSyncNode)) { // has snapshot - SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; + SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1}; if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); } @@ -1872,8 +1890,8 @@ SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) { SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) { ASSERT(index >= SYNC_INDEX_BEGIN); - SyncIndex syncStartIndex = syncNodeSyncStartIndex(pSyncNode); + SyncIndex syncStartIndex = syncNodeSyncStartIndex(pSyncNode); if (index > syncStartIndex) { syncNodeLog3("syncNodeGetPreIndex", pSyncNode); ASSERT(0); @@ -1885,8 +1903,47 @@ SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) { SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) { ASSERT(index >= SYNC_INDEX_BEGIN); + SyncIndex syncStartIndex = syncNodeSyncStartIndex(pSyncNode); + if (index > syncStartIndex) { + syncNodeLog3("syncNodeGetPreTerm", pSyncNode); + ASSERT(0); + } + + if (index == SYNC_INDEX_BEGIN) { + return 0; + } + + SyncTerm preTerm = 0; + SyncIndex preIndex = index - 1; + SSyncRaftEntry* pPreEntry = NULL; + int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry); + if (code == 0) { + ASSERT(pPreEntry != NULL); + preTerm = pPreEntry->term; + taosMemoryFree(pPreEntry); + return preTerm; + } else { + if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) { + SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1}; + if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { + pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); + if (snapshot.lastApplyIndex == preIndex) { + return snapshot.lastApplyTerm; + } + } + } + } + + ASSERT(0); + return -1; +} +#if 0 +SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) { + ASSERT(index >= SYNC_INDEX_BEGIN); + + SyncIndex syncStartIndex = syncNodeSyncStartIndex(pSyncNode); if (index > syncStartIndex) { syncNodeLog3("syncNodeGetPreTerm", pSyncNode); ASSERT(0); @@ -1899,7 +1956,7 @@ SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) { SyncTerm preTerm = 0; if (syncNodeHasSnapshot(pSyncNode)) { // has snapshot - SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; + SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1}; if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); } @@ -1946,6 +2003,7 @@ SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) { return preTerm; } +#endif // get pre index and term of "index" int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) { diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index a169ea0221..9a4d47ee85 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -25,7 +25,7 @@ static SyncIndex raftLogEndIndex(struct SSyncLogStore* pLogStore); static SyncIndex raftLogWriteIndex(struct SSyncLogStore* pLogStore); static bool raftLogIsEmpty(struct SSyncLogStore* pLogStore); static int32_t raftLogEntryCount(struct SSyncLogStore* pLogStore); -static bool raftLogInRange(struct SSyncLogStore* pLogStore, SyncIndex index); + static SyncIndex raftLogLastIndex(struct SSyncLogStore* pLogStore); static SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore); static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry); @@ -58,8 +58,6 @@ static int32_t raftLogSetBeginIndex(struct SSyncLogStore* pLogStore, SyncIndex b return 0; } -int32_t raftLogResetBeginIndex(struct SSyncLogStore* pLogStore) { return 0; } - static SyncIndex raftLogBeginIndex(struct SSyncLogStore* pLogStore) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; @@ -81,6 +79,7 @@ static int32_t raftLogEntryCount(struct SSyncLogStore* pLogStore) { return count > 0 ? count : 0; } +#if 0 static bool raftLogInRange(struct SSyncLogStore* pLogStore, SyncIndex index) { SyncIndex beginIndex = raftLogBeginIndex(pLogStore); SyncIndex endIndex = raftLogEndIndex(pLogStore); @@ -90,6 +89,7 @@ static bool raftLogInRange(struct SSyncLogStore* pLogStore, SyncIndex index) { return false; } } +#endif static SyncIndex raftLogLastIndex(struct SSyncLogStore* pLogStore) { SyncIndex lastIndex; @@ -171,6 +171,7 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr return code; } +#if 0 static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; @@ -215,6 +216,49 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, return code; } +#endif + +static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry) { + SSyncLogStoreData* pData = pLogStore->data; + SWal* pWal = pData->pWal; + int32_t code; + + *ppEntry = NULL; + + SWalReadHandle* pWalHandle = walOpenReadHandle(pWal); + if (pWalHandle == NULL) { + return -1; + } + + code = walReadWithHandle(pWalHandle, index); + if (code != 0) { + int32_t err = terrno; + const char* errStr = tstrerror(err); + int32_t linuxErr = errno; + const char* linuxErrMsg = strerror(errno); + sError("raftLogGetEntry error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, + linuxErrMsg); + + walCloseReadHandle(pWalHandle); + return code; + } + + *ppEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen); + ASSERT(*ppEntry != NULL); + (*ppEntry)->msgType = TDMT_SYNC_CLIENT_REQUEST; + (*ppEntry)->originalRpcType = pWalHandle->pHead->head.msgType; + (*ppEntry)->seqNum = pWalHandle->pHead->head.syncMeta.seqNum; + (*ppEntry)->isWeak = pWalHandle->pHead->head.syncMeta.isWeek; + (*ppEntry)->term = pWalHandle->pHead->head.syncMeta.term; + (*ppEntry)->index = index; + ASSERT((*ppEntry)->dataLen == pWalHandle->pHead->head.bodyLen); + memcpy((*ppEntry)->data, pWalHandle->pHead->head.body, pWalHandle->pHead->head.bodyLen); + + // need to hold, do not new every time!! + walCloseReadHandle(pWalHandle); + + return code; +} static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIndex) { SSyncLogStoreData* pData = pLogStore->data; @@ -277,7 +321,6 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) { pLogStore->syncLogEndIndex = raftLogEndIndex; pLogStore->syncLogIsEmpty = raftLogIsEmpty; pLogStore->syncLogEntryCount = raftLogEntryCount; - pLogStore->syncLogInRange = raftLogInRange; pLogStore->syncLogLastIndex = raftLogLastIndex; pLogStore->syncLogLastTerm = raftLogLastTerm; pLogStore->syncLogAppendEntry = raftLogAppendEntry; @@ -285,6 +328,8 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) { pLogStore->syncLogTruncate = raftLogTruncate; pLogStore->syncLogWriteIndex = raftLogWriteIndex; + // pLogStore->syncLogInRange = raftLogInRange; + return pLogStore; } diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 06a74e8eee..d6e6fbe522 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -148,9 +148,10 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) { SSyncRaftEntry* pEntry; int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, nextIndex, &pEntry); - ASSERT(code == 0); - if (pEntry != NULL) { + if (code == 0) { + ASSERT(pEntry != NULL); + pMsg = syncAppendEntriesBuild(pEntry->bytes, pSyncNode->vgId); ASSERT(pMsg != NULL); @@ -164,9 +165,15 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) { syncEntryDestory(pEntry); } else { - // no entry in log - pMsg = syncAppendEntriesBuild(0, pSyncNode->vgId); - ASSERT(pMsg != NULL); + if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) { + // no entry in log + pMsg = syncAppendEntriesBuild(0, pSyncNode->vgId); + ASSERT(pMsg != NULL); + + } else { + syncNodeLog3("", pSyncNode); + ASSERT(0); + } } // prepare msg diff --git a/source/libs/sync/test/CMakeLists.txt b/source/libs/sync/test/CMakeLists.txt index 2057aa23a4..27084286da 100644 --- a/source/libs/sync/test/CMakeLists.txt +++ b/source/libs/sync/test/CMakeLists.txt @@ -34,6 +34,7 @@ add_executable(syncWriteTest "") add_executable(syncReplicateTest "") add_executable(syncRefTest "") add_executable(syncLogStoreCheck "") +add_executable(syncLogStoreCheck2 "") add_executable(syncRaftCfgTest "") add_executable(syncRespMgrTest "") add_executable(syncSnapshotTest "") @@ -196,6 +197,10 @@ target_sources(syncLogStoreCheck PRIVATE "syncLogStoreCheck.cpp" ) +target_sources(syncLogStoreCheck2 + PRIVATE + "syncLogStoreCheck2.cpp" +) target_sources(syncRaftCfgTest PRIVATE "syncRaftCfgTest.cpp" @@ -442,6 +447,11 @@ target_include_directories(syncLogStoreCheck "${TD_SOURCE_DIR}/include/libs/sync" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_include_directories(syncLogStoreCheck2 + PUBLIC + "${TD_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) target_include_directories(syncRaftCfgTest PUBLIC "${TD_SOURCE_DIR}/include/libs/sync" @@ -668,6 +678,10 @@ target_link_libraries(syncLogStoreCheck sync gtest_main ) +target_link_libraries(syncLogStoreCheck2 + sync + gtest_main +) target_link_libraries(syncRaftCfgTest sync gtest_main diff --git a/source/libs/sync/test/syncLogStoreCheck2.cpp b/source/libs/sync/test/syncLogStoreCheck2.cpp new file mode 100644 index 0000000000..431b291ca7 --- /dev/null +++ b/source/libs/sync/test/syncLogStoreCheck2.cpp @@ -0,0 +1,76 @@ +#include +#include +#include "syncEnv.h" +#include "syncIO.h" +#include "syncInt.h" +#include "syncRaftLog.h" +#include "syncRaftStore.h" +#include "syncUtil.h" +#include "wal.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +void init() { + int code = walInit(); + assert(code == 0); +} + +void cleanup() { walCleanUp(); } + +SWal* createWal(char* path, int32_t vgId) { + SWalCfg walCfg; + memset(&walCfg, 0, sizeof(SWalCfg)); + walCfg.vgId = vgId; + walCfg.fsyncPeriod = 1000; + walCfg.retentionPeriod = 1000; + walCfg.rollPeriod = 1000; + walCfg.retentionSize = 1000; + walCfg.segSize = 1000; + walCfg.level = TAOS_WAL_FSYNC; + SWal* pWal = walOpen(path, &walCfg); + assert(pWal != NULL); + return pWal; +} + +SSyncNode* createSyncNode(SWal* pWal) { + SSyncNode* pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(SSyncNode)); + memset(pSyncNode, 0, sizeof(SSyncNode)); + pSyncNode->pWal = pWal; + return pSyncNode; +} + +void usage(char* exe) { printf("usage: %s path vgId \n", exe); } + +int main(int argc, char** argv) { + if (argc != 3) { + usage(argv[0]); + exit(-1); + } + char* path = argv[1]; + int32_t vgId = atoi(argv[2]); + + init(); + SWal* pWal = createWal(path, vgId); + assert(pWal != NULL); + SSyncNode* pSyncNode = createSyncNode(pWal); + assert(pSyncNode != NULL); + + SSyncLogStore* pLog = logStoreCreate(pSyncNode); + assert(pLog != NULL); + + logStorePrint2((char*)"==syncLogStoreCheck==", pLog); + + walClose(pWal); + logStoreDestory(pLog); + taosMemoryFree(pSyncNode); + + cleanup(); + return 0; +} diff --git a/source/libs/sync/test/syncRaftLogTest2.cpp b/source/libs/sync/test/syncRaftLogTest2.cpp index 32ff441a6f..bd6fa7c2c3 100644 --- a/source/libs/sync/test/syncRaftLogTest2.cpp +++ b/source/libs/sync/test/syncRaftLogTest2.cpp @@ -69,6 +69,7 @@ void test1() { init(); pLogStore = logStoreCreate(pSyncNode); assert(pLogStore); + pSyncNode->pLogStore = pLogStore; logStoreLog2((char*)"\n\n\ntest1 ----- ", pLogStore); if (gAssert) { @@ -88,6 +89,7 @@ void test1() { init(); pLogStore = logStoreCreate(pSyncNode); assert(pLogStore); + pSyncNode->pLogStore = pLogStore; logStoreLog2((char*)"\n\n\ntest1 restart ----- ", pLogStore); if (gAssert) { @@ -110,6 +112,7 @@ void test2() { init(); pLogStore = logStoreCreate(pSyncNode); assert(pLogStore); + pSyncNode->pLogStore = pLogStore; pLogStore->syncLogSetBeginIndex(pLogStore, 5); logStoreLog2((char*)"\n\n\ntest2 ----- ", pLogStore); @@ -130,6 +133,7 @@ void test2() { init(); pLogStore = logStoreCreate(pSyncNode); assert(pLogStore); + pSyncNode->pLogStore = pLogStore; logStoreLog2((char*)"\n\n\ntest2 restart ----- ", pLogStore); if (gAssert) { @@ -152,6 +156,7 @@ void test3() { init(); pLogStore = logStoreCreate(pSyncNode); assert(pLogStore); + pSyncNode->pLogStore = pLogStore; logStoreLog2((char*)"\n\n\ntest3 ----- ", pLogStore); if (gAssert) { @@ -198,6 +203,7 @@ void test3() { init(); pLogStore = logStoreCreate(pSyncNode); assert(pLogStore); + pSyncNode->pLogStore = pLogStore; logStoreLog2((char*)"\n\n\ntest3 restart ----- ", pLogStore); if (gAssert) { @@ -220,6 +226,7 @@ void test4() { init(); pLogStore = logStoreCreate(pSyncNode); assert(pLogStore); + pSyncNode->pLogStore = pLogStore; logStoreLog2((char*)"\n\n\ntest4 ----- ", pLogStore); pLogStore->syncLogSetBeginIndex(pLogStore, 5); @@ -257,6 +264,7 @@ void test4() { init(); pLogStore = logStoreCreate(pSyncNode); assert(pLogStore); + pSyncNode->pLogStore = pLogStore; logStoreLog2((char*)"\n\n\ntest4 restart ----- ", pLogStore); if (gAssert) { @@ -279,6 +287,7 @@ void test5() { init(); pLogStore = logStoreCreate(pSyncNode); assert(pLogStore); + pSyncNode->pLogStore = pLogStore; logStoreLog2((char*)"\n\n\ntest5 ----- ", pLogStore); pLogStore->syncLogSetBeginIndex(pLogStore, 5); @@ -329,6 +338,7 @@ void test5() { init(); pLogStore = logStoreCreate(pSyncNode); assert(pLogStore); + pSyncNode->pLogStore = pLogStore; logStoreLog2((char*)"\n\n\ntest5 restart ----- ", pLogStore); if (gAssert) { @@ -351,6 +361,7 @@ void test6() { init(); pLogStore = logStoreCreate(pSyncNode); assert(pLogStore); + pSyncNode->pLogStore = pLogStore; logStoreLog2((char*)"\n\n\ntest6 ----- ", pLogStore); pLogStore->syncLogSetBeginIndex(pLogStore, 5); @@ -401,6 +412,7 @@ void test6() { init(); pLogStore = logStoreCreate(pSyncNode); assert(pLogStore); + pSyncNode->pLogStore = pLogStore; logStoreLog2((char*)"\n\n\ntest6 restart ----- ", pLogStore); if (gAssert) { diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index e940191cea..b1129b441c 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -259,6 +259,12 @@ int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalReadHead **p int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) { int64_t code; + + if (pRead->pWal->vers.firstVer == -1) { + terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; + return -1; + } + // TODO: check wal life if (pRead->curVersion != ver) { if (walReadSeekVer(pRead, ver) < 0) { -- GitLab