提交 363c98e9 编写于 作者: M Minghao Li

refactor(sync): abstract for log store

上级 ecc43b66
......@@ -162,7 +162,7 @@ typedef struct SSyncLogStore {
SyncIndex (*syncLogEndIndex)(struct SSyncLogStore* pLogStore);
bool (*syncLogIsEmpty)(struct SSyncLogStore* pLogStore);
int32_t (*syncLogEntryCount)(struct SSyncLogStore* pLogStore);
bool (*syncLogInRange)(struct SSyncLogStore* pLogStore, SyncIndex index);
// bool (*syncLogInRange)(struct SSyncLogStore* pLogStore, SyncIndex index);
SyncIndex (*syncLogWriteIndex)(struct SSyncLogStore* pLogStore);
SyncIndex (*syncLogLastIndex)(struct SSyncLogStore* pLogStore);
......
......@@ -1311,27 +1311,35 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
if (userStrLen < 256) {
char logBuf[128 + 256];
snprintf(logBuf, sizeof(logBuf),
"vgId:%d, sync %s %s, term:%lu, commit:%ld, beginlog:%ld, lastlog:%ld, lastsnapshot:%ld, standby:%d, "
"replica-num:%d, "
"lconfig:%ld, changing:%d",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex,
pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
pSyncNode->changing);
if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) {
snprintf(logBuf, sizeof(logBuf),
"vgId:%d, sync %s %s, term:%lu, commit:%ld, beginlog:%ld, lastlog:%ld, lastsnapshot:%ld, standby:%d, "
"replica-num:%d, "
"lconfig:%ld, changing:%d",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex,
pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
pSyncNode->changing);
} else {
snprintf(logBuf, sizeof(logBuf), "%s", str);
}
sDebug("%s", logBuf);
} else {
int len = 128 + userStrLen;
char* s = (char*)taosMemoryMalloc(len);
snprintf(s, len,
"vgId:%d, sync %s %s, term:%lu, commit:%ld, beginlog:%ld, lastlog:%ld, lastsnapshot:%ld, standby:%d, "
"replica-num:%d, "
"lconfig:%ld, changing:%d",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex,
pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
pSyncNode->changing);
if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) {
snprintf(s, len,
"vgId:%d, sync %s %s, term:%lu, commit:%ld, beginlog:%ld, lastlog:%ld, lastsnapshot:%ld, standby:%d, "
"replica-num:%d, "
"lconfig:%ld, changing:%d",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex,
pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
pSyncNode->changing);
} else {
snprintf(s, len, "%s", str);
}
sDebug("%s", s);
taosMemoryFree(s);
}
......@@ -1349,27 +1357,35 @@ void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) {
if (userStrLen < 256) {
char logBuf[128 + 256];
snprintf(logBuf, sizeof(logBuf),
"vgId:%d, sync %s %s, term:%lu, commit:%ld, beginlog:%ld, lastlog:%ld, lastsnapshot:%ld, standby:%d, "
"replica-num:%d, "
"lconfig:%ld, changing:%d",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex,
pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
pSyncNode->changing);
if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) {
snprintf(logBuf, sizeof(logBuf),
"vgId:%d, sync %s %s, term:%lu, commit:%ld, beginlog:%ld, lastlog:%ld, lastsnapshot:%ld, standby:%d, "
"replica-num:%d, "
"lconfig:%ld, changing:%d",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex,
pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
pSyncNode->changing);
} else {
snprintf(logBuf, sizeof(logBuf), "%s", str);
}
sError("%s", logBuf);
} else {
int len = 128 + userStrLen;
char* s = (char*)taosMemoryMalloc(len);
snprintf(s, len,
"vgId:%d, sync %s %s, term:%lu, commit:%ld, beginlog:%ld, lastlog:%ld, lastsnapshot:%ld, standby:%d, "
"replica-num:%d, "
"lconfig:%ld, changing:%d",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex,
pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
pSyncNode->changing);
if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) {
snprintf(s, len,
"vgId:%d, sync %s %s, term:%lu, commit:%ld, beginlog:%ld, lastlog:%ld, lastsnapshot:%ld, standby:%d, "
"replica-num:%d, "
"lconfig:%ld, changing:%d",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex,
pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
pSyncNode->changing);
} else {
snprintf(s, len, "%s", str);
}
sError("%s", s);
taosMemoryFree(s);
}
......@@ -1802,7 +1818,7 @@ void syncNodeVoteForSelf(SSyncNode* pSyncNode) {
// snapshot --------------
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
bool ret = false;
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
......@@ -1812,6 +1828,7 @@ bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
return ret;
}
#if 0
bool syncNodeIsIndexInSnapshot(SSyncNode* pSyncNode, SyncIndex index) {
ASSERT(syncNodeHasSnapshot(pSyncNode));
ASSERT(pSyncNode->pFsm->FpGetSnapshotInfo != NULL);
......@@ -1822,6 +1839,7 @@ bool syncNodeIsIndexInSnapshot(SSyncNode* pSyncNode, SyncIndex index) {
bool b = (index <= snapshot.lastApplyIndex);
return b;
}
#endif
SyncIndex syncNodeGetLastIndex(SSyncNode* pSyncNode) {
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
......@@ -1838,7 +1856,7 @@ SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
SyncTerm lastTerm = 0;
if (syncNodeHasSnapshot(pSyncNode)) {
// has snapshot
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
}
......@@ -1872,8 +1890,8 @@ SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
ASSERT(index >= SYNC_INDEX_BEGIN);
SyncIndex syncStartIndex = syncNodeSyncStartIndex(pSyncNode);
SyncIndex syncStartIndex = syncNodeSyncStartIndex(pSyncNode);
if (index > syncStartIndex) {
syncNodeLog3("syncNodeGetPreIndex", pSyncNode);
ASSERT(0);
......@@ -1885,8 +1903,47 @@ SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
ASSERT(index >= SYNC_INDEX_BEGIN);
SyncIndex syncStartIndex = syncNodeSyncStartIndex(pSyncNode);
if (index > syncStartIndex) {
syncNodeLog3("syncNodeGetPreTerm", pSyncNode);
ASSERT(0);
}
if (index == SYNC_INDEX_BEGIN) {
return 0;
}
SyncTerm preTerm = 0;
SyncIndex preIndex = index - 1;
SSyncRaftEntry* pPreEntry = NULL;
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
if (code == 0) {
ASSERT(pPreEntry != NULL);
preTerm = pPreEntry->term;
taosMemoryFree(pPreEntry);
return preTerm;
} else {
if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) {
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
if (snapshot.lastApplyIndex == preIndex) {
return snapshot.lastApplyTerm;
}
}
}
}
ASSERT(0);
return -1;
}
#if 0
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
ASSERT(index >= SYNC_INDEX_BEGIN);
SyncIndex syncStartIndex = syncNodeSyncStartIndex(pSyncNode);
if (index > syncStartIndex) {
syncNodeLog3("syncNodeGetPreTerm", pSyncNode);
ASSERT(0);
......@@ -1899,7 +1956,7 @@ SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
SyncTerm preTerm = 0;
if (syncNodeHasSnapshot(pSyncNode)) {
// has snapshot
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
}
......@@ -1946,6 +2003,7 @@ SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
return preTerm;
}
#endif
// get pre index and term of "index"
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) {
......
......@@ -25,7 +25,7 @@ static SyncIndex raftLogEndIndex(struct SSyncLogStore* pLogStore);
static SyncIndex raftLogWriteIndex(struct SSyncLogStore* pLogStore);
static bool raftLogIsEmpty(struct SSyncLogStore* pLogStore);
static int32_t raftLogEntryCount(struct SSyncLogStore* pLogStore);
static bool raftLogInRange(struct SSyncLogStore* pLogStore, SyncIndex index);
static SyncIndex raftLogLastIndex(struct SSyncLogStore* pLogStore);
static SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore);
static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry);
......@@ -58,8 +58,6 @@ static int32_t raftLogSetBeginIndex(struct SSyncLogStore* pLogStore, SyncIndex b
return 0;
}
int32_t raftLogResetBeginIndex(struct SSyncLogStore* pLogStore) { return 0; }
static SyncIndex raftLogBeginIndex(struct SSyncLogStore* pLogStore) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
......@@ -81,6 +79,7 @@ static int32_t raftLogEntryCount(struct SSyncLogStore* pLogStore) {
return count > 0 ? count : 0;
}
#if 0
static bool raftLogInRange(struct SSyncLogStore* pLogStore, SyncIndex index) {
SyncIndex beginIndex = raftLogBeginIndex(pLogStore);
SyncIndex endIndex = raftLogEndIndex(pLogStore);
......@@ -90,6 +89,7 @@ static bool raftLogInRange(struct SSyncLogStore* pLogStore, SyncIndex index) {
return false;
}
}
#endif
static SyncIndex raftLogLastIndex(struct SSyncLogStore* pLogStore) {
SyncIndex lastIndex;
......@@ -171,6 +171,7 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
return code;
}
#if 0
static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
......@@ -215,6 +216,49 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index,
return code;
}
#endif
static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
int32_t code;
*ppEntry = NULL;
SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
if (pWalHandle == NULL) {
return -1;
}
code = walReadWithHandle(pWalHandle, index);
if (code != 0) {
int32_t err = terrno;
const char* errStr = tstrerror(err);
int32_t linuxErr = errno;
const char* linuxErrMsg = strerror(errno);
sError("raftLogGetEntry error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr,
linuxErrMsg);
walCloseReadHandle(pWalHandle);
return code;
}
*ppEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen);
ASSERT(*ppEntry != NULL);
(*ppEntry)->msgType = TDMT_SYNC_CLIENT_REQUEST;
(*ppEntry)->originalRpcType = pWalHandle->pHead->head.msgType;
(*ppEntry)->seqNum = pWalHandle->pHead->head.syncMeta.seqNum;
(*ppEntry)->isWeak = pWalHandle->pHead->head.syncMeta.isWeek;
(*ppEntry)->term = pWalHandle->pHead->head.syncMeta.term;
(*ppEntry)->index = index;
ASSERT((*ppEntry)->dataLen == pWalHandle->pHead->head.bodyLen);
memcpy((*ppEntry)->data, pWalHandle->pHead->head.body, pWalHandle->pHead->head.bodyLen);
// need to hold, do not new every time!!
walCloseReadHandle(pWalHandle);
return code;
}
static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIndex) {
SSyncLogStoreData* pData = pLogStore->data;
......@@ -277,7 +321,6 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
pLogStore->syncLogEndIndex = raftLogEndIndex;
pLogStore->syncLogIsEmpty = raftLogIsEmpty;
pLogStore->syncLogEntryCount = raftLogEntryCount;
pLogStore->syncLogInRange = raftLogInRange;
pLogStore->syncLogLastIndex = raftLogLastIndex;
pLogStore->syncLogLastTerm = raftLogLastTerm;
pLogStore->syncLogAppendEntry = raftLogAppendEntry;
......@@ -285,6 +328,8 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
pLogStore->syncLogTruncate = raftLogTruncate;
pLogStore->syncLogWriteIndex = raftLogWriteIndex;
// pLogStore->syncLogInRange = raftLogInRange;
return pLogStore;
}
......
......@@ -148,9 +148,10 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) {
SSyncRaftEntry* pEntry;
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, nextIndex, &pEntry);
ASSERT(code == 0);
if (pEntry != NULL) {
if (code == 0) {
ASSERT(pEntry != NULL);
pMsg = syncAppendEntriesBuild(pEntry->bytes, pSyncNode->vgId);
ASSERT(pMsg != NULL);
......@@ -164,9 +165,15 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) {
syncEntryDestory(pEntry);
} else {
// no entry in log
pMsg = syncAppendEntriesBuild(0, pSyncNode->vgId);
ASSERT(pMsg != NULL);
if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) {
// no entry in log
pMsg = syncAppendEntriesBuild(0, pSyncNode->vgId);
ASSERT(pMsg != NULL);
} else {
syncNodeLog3("", pSyncNode);
ASSERT(0);
}
}
// prepare msg
......
......@@ -34,6 +34,7 @@ add_executable(syncWriteTest "")
add_executable(syncReplicateTest "")
add_executable(syncRefTest "")
add_executable(syncLogStoreCheck "")
add_executable(syncLogStoreCheck2 "")
add_executable(syncRaftCfgTest "")
add_executable(syncRespMgrTest "")
add_executable(syncSnapshotTest "")
......@@ -196,6 +197,10 @@ target_sources(syncLogStoreCheck
PRIVATE
"syncLogStoreCheck.cpp"
)
target_sources(syncLogStoreCheck2
PRIVATE
"syncLogStoreCheck2.cpp"
)
target_sources(syncRaftCfgTest
PRIVATE
"syncRaftCfgTest.cpp"
......@@ -442,6 +447,11 @@ target_include_directories(syncLogStoreCheck
"${TD_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncLogStoreCheck2
PUBLIC
"${TD_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncRaftCfgTest
PUBLIC
"${TD_SOURCE_DIR}/include/libs/sync"
......@@ -668,6 +678,10 @@ target_link_libraries(syncLogStoreCheck
sync
gtest_main
)
target_link_libraries(syncLogStoreCheck2
sync
gtest_main
)
target_link_libraries(syncRaftCfgTest
sync
gtest_main
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
#include "wal.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
void init() {
int code = walInit();
assert(code == 0);
}
void cleanup() { walCleanUp(); }
SWal* createWal(char* path, int32_t vgId) {
SWalCfg walCfg;
memset(&walCfg, 0, sizeof(SWalCfg));
walCfg.vgId = vgId;
walCfg.fsyncPeriod = 1000;
walCfg.retentionPeriod = 1000;
walCfg.rollPeriod = 1000;
walCfg.retentionSize = 1000;
walCfg.segSize = 1000;
walCfg.level = TAOS_WAL_FSYNC;
SWal* pWal = walOpen(path, &walCfg);
assert(pWal != NULL);
return pWal;
}
SSyncNode* createSyncNode(SWal* pWal) {
SSyncNode* pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(SSyncNode));
memset(pSyncNode, 0, sizeof(SSyncNode));
pSyncNode->pWal = pWal;
return pSyncNode;
}
void usage(char* exe) { printf("usage: %s path vgId \n", exe); }
int main(int argc, char** argv) {
if (argc != 3) {
usage(argv[0]);
exit(-1);
}
char* path = argv[1];
int32_t vgId = atoi(argv[2]);
init();
SWal* pWal = createWal(path, vgId);
assert(pWal != NULL);
SSyncNode* pSyncNode = createSyncNode(pWal);
assert(pSyncNode != NULL);
SSyncLogStore* pLog = logStoreCreate(pSyncNode);
assert(pLog != NULL);
logStorePrint2((char*)"==syncLogStoreCheck==", pLog);
walClose(pWal);
logStoreDestory(pLog);
taosMemoryFree(pSyncNode);
cleanup();
return 0;
}
......@@ -69,6 +69,7 @@ void test1() {
init();
pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore);
pSyncNode->pLogStore = pLogStore;
logStoreLog2((char*)"\n\n\ntest1 ----- ", pLogStore);
if (gAssert) {
......@@ -88,6 +89,7 @@ void test1() {
init();
pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore);
pSyncNode->pLogStore = pLogStore;
logStoreLog2((char*)"\n\n\ntest1 restart ----- ", pLogStore);
if (gAssert) {
......@@ -110,6 +112,7 @@ void test2() {
init();
pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore);
pSyncNode->pLogStore = pLogStore;
pLogStore->syncLogSetBeginIndex(pLogStore, 5);
logStoreLog2((char*)"\n\n\ntest2 ----- ", pLogStore);
......@@ -130,6 +133,7 @@ void test2() {
init();
pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore);
pSyncNode->pLogStore = pLogStore;
logStoreLog2((char*)"\n\n\ntest2 restart ----- ", pLogStore);
if (gAssert) {
......@@ -152,6 +156,7 @@ void test3() {
init();
pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore);
pSyncNode->pLogStore = pLogStore;
logStoreLog2((char*)"\n\n\ntest3 ----- ", pLogStore);
if (gAssert) {
......@@ -198,6 +203,7 @@ void test3() {
init();
pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore);
pSyncNode->pLogStore = pLogStore;
logStoreLog2((char*)"\n\n\ntest3 restart ----- ", pLogStore);
if (gAssert) {
......@@ -220,6 +226,7 @@ void test4() {
init();
pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore);
pSyncNode->pLogStore = pLogStore;
logStoreLog2((char*)"\n\n\ntest4 ----- ", pLogStore);
pLogStore->syncLogSetBeginIndex(pLogStore, 5);
......@@ -257,6 +264,7 @@ void test4() {
init();
pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore);
pSyncNode->pLogStore = pLogStore;
logStoreLog2((char*)"\n\n\ntest4 restart ----- ", pLogStore);
if (gAssert) {
......@@ -279,6 +287,7 @@ void test5() {
init();
pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore);
pSyncNode->pLogStore = pLogStore;
logStoreLog2((char*)"\n\n\ntest5 ----- ", pLogStore);
pLogStore->syncLogSetBeginIndex(pLogStore, 5);
......@@ -329,6 +338,7 @@ void test5() {
init();
pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore);
pSyncNode->pLogStore = pLogStore;
logStoreLog2((char*)"\n\n\ntest5 restart ----- ", pLogStore);
if (gAssert) {
......@@ -351,6 +361,7 @@ void test6() {
init();
pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore);
pSyncNode->pLogStore = pLogStore;
logStoreLog2((char*)"\n\n\ntest6 ----- ", pLogStore);
pLogStore->syncLogSetBeginIndex(pLogStore, 5);
......@@ -401,6 +412,7 @@ void test6() {
init();
pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore);
pSyncNode->pLogStore = pLogStore;
logStoreLog2((char*)"\n\n\ntest6 restart ----- ", pLogStore);
if (gAssert) {
......
......@@ -259,6 +259,12 @@ int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalReadHead **p
int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
int64_t code;
if (pRead->pWal->vers.firstVer == -1) {
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
return -1;
}
// TODO: check wal life
if (pRead->curVersion != ver) {
if (walReadSeekVer(pRead, ver) < 0) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册