未验证 提交 b70531d1 编写于 作者: L Li Minghao 提交者: GitHub

Merge pull request #14186 from taosdata/feature/3.0_mhli

refactor(sync): refactor wal abstraction
......@@ -157,13 +157,13 @@ typedef struct SSyncLogStore {
SyncIndex (*getCommitIndex)(struct SSyncLogStore* pLogStore);
// refactor, log[0 .. n] ==> log[m .. n]
int32_t (*syncLogSetBeginIndex)(struct SSyncLogStore* pLogStore, SyncIndex beginIndex);
int32_t (*syncLogResetBeginIndex)(struct SSyncLogStore* pLogStore);
// int32_t (*syncLogSetBeginIndex)(struct SSyncLogStore* pLogStore, SyncIndex beginIndex);
SyncIndex (*syncLogBeginIndex)(struct SSyncLogStore* pLogStore);
SyncIndex (*syncLogEndIndex)(struct SSyncLogStore* pLogStore);
bool (*syncLogIsEmpty)(struct SSyncLogStore* pLogStore);
int32_t (*syncLogEntryCount)(struct SSyncLogStore* pLogStore);
// bool (*syncLogInRange)(struct SSyncLogStore* pLogStore, SyncIndex index);
int32_t (*syncLogRestoreFromSnapshot)(struct SSyncLogStore* pLogStore, SyncIndex index);
SyncIndex (*syncLogWriteIndex)(struct SSyncLogStore* pLogStore);
SyncIndex (*syncLogLastIndex)(struct SSyncLogStore* pLogStore);
......
......@@ -28,13 +28,13 @@ extern "C" {
#include "trpc.h"
#include "ttimer.h"
#define TIMER_MAX_MS 0x7FFFFFFF
#define ENV_TICK_TIMER_MS 1000
#define PING_TIMER_MS 1000
#define ELECT_TIMER_MS_MIN 500
#define ELECT_TIMER_MS_MAX (ELECT_TIMER_MS_MIN * 2)
#define TIMER_MAX_MS 0x7FFFFFFF
#define ENV_TICK_TIMER_MS 1000
#define PING_TIMER_MS 1000
#define ELECT_TIMER_MS_MIN 1300
#define ELECT_TIMER_MS_MAX (ELECT_TIMER_MS_MIN * 2)
#define ELECT_TIMER_MS_RANGE (ELECT_TIMER_MS_MAX - ELECT_TIMER_MS_MIN)
#define HEARTBEAT_TIMER_MS 100
#define HEARTBEAT_TIMER_MS 900
#define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0})
......
......@@ -26,11 +26,13 @@ extern "C" {
#include "syncInt.h"
#include "syncRaftEntry.h"
#include "taosdef.h"
#include "wal.h"
typedef struct SSyncLogStoreData {
SSyncNode* pSyncNode;
SWal* pWal;
SyncIndex beginIndex; // valid begin index, default 0, may be set beginIndex > 0
SSyncNode* pSyncNode;
SWal* pWal;
SWalReadHandle* pWalHandle;
// SyncIndex beginIndex; // valid begin index, default 0, may be set beginIndex > 0
} SSyncLogStoreData;
SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode);
......
......@@ -420,44 +420,26 @@ static int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry) {
// prevLogIndex == -1
static bool syncNodeOnAppendEntriesLogOK(SSyncNode* pSyncNode, SyncAppendEntries* pMsg) {
if (pMsg->prevLogIndex == SYNC_INDEX_INVALID) {
if (gRaftDetailLog) {
sTrace("syncNodeOnAppendEntriesLogOK true, pMsg->prevLogIndex:%ld", pMsg->prevLogIndex);
}
return true;
}
SyncIndex myLastIndex = syncNodeGetLastIndex(pSyncNode);
if (pMsg->prevLogIndex > myLastIndex) {
if (gRaftDetailLog) {
sTrace("syncNodeOnAppendEntriesLogOK false, pMsg->prevLogIndex:%ld, myLastIndex:%ld", pMsg->prevLogIndex,
myLastIndex);
}
sDebug("vgId:%d sync log not ok, preindex:%ld", pSyncNode->vgId, pMsg->prevLogIndex);
return false;
}
SyncTerm myPreLogTerm = syncNodeGetPreTerm(pSyncNode, pMsg->prevLogIndex + 1);
if (myPreLogTerm == SYNC_TERM_INVALID) {
sError("vgId:%d sync get pre term error, preindex:%ld", pSyncNode->vgId, pMsg->prevLogIndex);
sDebug("vgId:%d sync log not ok2, preindex:%ld", pSyncNode->vgId, pMsg->prevLogIndex);
return false;
}
if (pMsg->prevLogIndex <= myLastIndex && pMsg->prevLogTerm == myPreLogTerm) {
if (gRaftDetailLog) {
sTrace(
"syncNodeOnAppendEntriesLogOK true, pMsg->prevLogIndex:%ld, myLastIndex:%ld, pMsg->prevLogTerm:%lu, "
"myPreLogTerm:%lu",
pMsg->prevLogIndex, myLastIndex, pMsg->prevLogTerm, myPreLogTerm);
}
return true;
}
if (gRaftDetailLog) {
sTrace(
"syncNodeOnAppendEntriesLogOK false, pMsg->prevLogIndex:%ld, myLastIndex:%ld, pMsg->prevLogTerm:%lu, "
"myPreLogTerm:%lu",
pMsg->prevLogIndex, myLastIndex, pMsg->prevLogTerm, myPreLogTerm);
}
sDebug("vgId:%d sync log not ok3, preindex:%ld", pSyncNode->vgId, pMsg->prevLogIndex);
return false;
}
......@@ -466,14 +448,11 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
int32_t code = 0;
// print log
char logBuf[128] = {0};
snprintf(logBuf, sizeof(logBuf), "recv SyncAppendEntries, vgId:%d, term:%lu", ths->vgId,
ths->pRaftStore->currentTerm);
syncAppendEntriesLog2(logBuf, pMsg);
syncAppendEntriesLog2("==syncNodeOnAppendEntriesSnapshotCb==", pMsg);
// if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
sInfo("recv SyncAppendEntries maybe replica already dropped");
syncNodeEventLog(ths, "recv sync-append-entries, maybe replica already dropped");
return ret;
}
......@@ -497,7 +476,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
do {
bool condition = pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE;
if (condition) {
sTrace("recv SyncAppendEntries, candidate to follower");
syncNodeEventLog(ths, "recv sync-append-entries, candidate to follower");
syncNodeBecomeFollower(ths, "from candidate by append entries");
// do not reply?
......@@ -505,6 +484,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
}
} while (0);
#if 0
// fake match
//
// condition1:
......@@ -530,7 +510,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
bool condition0 = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) &&
syncNodeHasSnapshot(ths);
bool condition1 =
condition0 && (ths->pLogStore->syncLogEntryCount(ths->pLogStore) == 0) && (pMsg->prevLogIndex > myLastIndex);
condition0 && (ths->pLogStore->syncLogEntryCount(ths->pLogStore) == 0) && (pMsg->prevLogIndex > myLastIndex); // donot use syncLogEntryCount!!! use isEmpty
bool condition2 = condition0 && (ths->pLogStore->syncLogLastIndex(ths->pLogStore) <= snapshot.lastApplyIndex) &&
(pMsg->prevLogIndex > myLastIndex);
bool condition3 = condition0 && (pMsg->prevLogIndex < snapshot.lastApplyIndex);
......@@ -538,11 +518,10 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
bool condition = condition1 || condition2 || condition3 || condition4;
if (condition) {
sTrace(
"recv SyncAppendEntries, fake match, myLastIndex:%ld, syncLogBeginIndex:%ld, syncLogEndIndex:%ld, "
"condition1:%d, condition2:%d, condition3:%d, condition4:%d",
myLastIndex, ths->pLogStore->syncLogBeginIndex(ths->pLogStore),
ths->pLogStore->syncLogEndIndex(ths->pLogStore), condition1, condition2, condition3, condition4);
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries, fake match, pre-index:%ld, pre-term:%lu",
pMsg->prevLogIndex, pMsg->prevLogTerm);
syncNodeEventLog(ths, logBuf);
// prepare response msg
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
......@@ -562,6 +541,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
return ret;
}
} while (0);
#endif
// fake match2
//
......@@ -576,8 +556,13 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
bool condition = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) &&
(pMsg->prevLogIndex <= ths->commitIndex);
if (condition) {
sTrace("recv SyncAppendEntries, fake match2, msg-prevLogIndex:%ld, my-commitIndex:%ld", pMsg->prevLogIndex,
ths->commitIndex);
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries, fake match2, pre-index:%ld, pre-term:%lu, datalen:%d", pMsg->prevLogIndex,
pMsg->prevLogTerm, pMsg->dataLen);
syncNodeEventLog(ths, logBuf);
} while (0);
SyncIndex matchIndex = ths->commitIndex;
bool hasAppendEntries = pMsg->dataLen > 0;
......@@ -605,6 +590,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
code = syncNodePreCommit(ths, pAppendEntry);
ASSERT(code == 0);
// update match index
matchIndex = pMsg->prevLogIndex + 1;
syncEntryDestory(pAppendEntry);
......@@ -650,11 +636,10 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
bool condition = condition1 || condition2;
if (condition) {
sTrace(
"recv SyncAppendEntries, not match, syncLogBeginIndex:%ld, syncLogEndIndex:%ld, condition1:%d, "
"condition2:%d, logOK:%d",
ths->pLogStore->syncLogBeginIndex(ths->pLogStore), ths->pLogStore->syncLogEndIndex(ths->pLogStore),
condition1, condition2, logOK);
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries, not match, pre-index:%ld, pre-term:%lu, datalen:%d",
pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen);
syncNodeEventLog(ths, logBuf);
// prepare response msg
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
......@@ -693,8 +678,10 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
// has entries in SyncAppendEntries msg
bool hasAppendEntries = pMsg->dataLen > 0;
sTrace("recv SyncAppendEntries, match, myLastIndex:%ld, hasExtraEntries:%d, hasAppendEntries:%d", myLastIndex,
hasExtraEntries, hasAppendEntries);
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries, match, pre-index:%ld, pre-term:%lu, datalen:%d",
pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen);
syncNodeEventLog(ths, logBuf);
if (hasExtraEntries) {
// make log same, rollback deleted entries
......
......@@ -101,32 +101,27 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
int32_t ret = 0;
// print log
char logBuf[128] = {0};
snprintf(logBuf, sizeof(logBuf), "recv SyncAppendEntriesReply, vgId:%d, term:%lu", ths->vgId,
ths->pRaftStore->currentTerm);
syncAppendEntriesReplyLog2(logBuf, pMsg);
syncAppendEntriesReplyLog2("==syncNodeOnAppendEntriesReplySnapshotCb==", pMsg);
// if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
sInfo("recv SyncAppendEntriesReply, maybe replica already dropped");
return ret;
syncNodeEventLog(ths, "recv sync-append-entries-reply, maybe replica already dropped");
return 0;
}
// drop stale response
if (pMsg->term < ths->pRaftStore->currentTerm) {
sTrace("recv SyncAppendEntriesReply, drop stale response, receive_term:%lu current_term:%lu", pMsg->term,
ths->pRaftStore->currentTerm);
return ret;
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, recv-term:%lu, drop stale response", pMsg->term);
syncNodeEventLog(ths, logBuf);
return 0;
}
syncIndexMgrLog2("recv SyncAppendEntriesReply, before pNextIndex:", ths->pNextIndex);
syncIndexMgrLog2("recv SyncAppendEntriesReply, before pMatchIndex:", ths->pMatchIndex);
if (gRaftDetailLog) {
SSnapshot snapshot;
ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
sTrace("recv SyncAppendEntriesReply, before snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu",
snapshot.lastApplyIndex, snapshot.lastApplyTerm);
syncNodeEventLog(ths, "recv sync-append-entries-reply, before");
}
syncIndexMgrLog2("recv sync-append-entries-reply, before pNextIndex:", ths->pNextIndex);
syncIndexMgrLog2("recv sync-append-entries-reply, before pMatchIndex:", ths->pMatchIndex);
// no need this code, because if I receive reply.term, then I must have sent for that term.
// if (pMsg->term > ths->pRaftStore->currentTerm) {
......@@ -134,12 +129,10 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
// }
if (pMsg->term > ths->pRaftStore->currentTerm) {
char logBuf[128] = {0};
snprintf(logBuf, sizeof(logBuf), "recv SyncAppendEntriesReply, error term, receive_term:%lu current_term:%lu",
pMsg->term, ths->pRaftStore->currentTerm);
syncNodeLog2(logBuf, ths);
sError("%s", logBuf);
return ret;
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, error term, recv-term:%lu", pMsg->term);
syncNodeErrorLog(ths, logBuf);
return -1;
}
ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
......@@ -228,8 +221,11 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
}
}
syncIndexMgrLog2("recv SyncAppendEntriesReply, after pNextIndex:", ths->pNextIndex);
syncIndexMgrLog2("recv SyncAppendEntriesReply, after pMatchIndex:", ths->pMatchIndex);
if (gRaftDetailLog) {
syncNodeEventLog(ths, "recv sync-append-entries-reply, after");
}
syncIndexMgrLog2("recv sync-append-entries-reply, after pNextIndex:", ths->pNextIndex);
syncIndexMgrLog2("recv sync-append-entries-reply, after pMatchIndex:", ths->pMatchIndex);
return ret;
}
\ No newline at end of file
......@@ -416,7 +416,7 @@ int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct
}
}
sMeta->lastConfigIndex = lastIndex;
sTrace("vgId:%d, get snapshot meta by index:%" PRId64 " lastConfigIndex:%" PRId64, pSyncNode->vgId, snapshotIndex,
sTrace("vgId:%d, get snapshot meta by index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId, snapshotIndex,
sMeta->lastConfigIndex);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
......@@ -433,8 +433,9 @@ SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapsho
lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[i];
}
}
sTrace("vgId:%d, sync get snapshot last config index, index:%ld lcindex:%ld", pSyncNode->vgId, snapshotLastApplyIndex,
lastIndex);
sTrace("sync syncNodeGetSnapshotConfigIndex index:%ld lastConfigIndex:%ld", snapshotLastApplyIndex, lastIndex);
return lastIndex;
}
......@@ -1310,6 +1311,10 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
SyncIndex logBeginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
char* pCfgStr = syncCfg2SimpleStr(&(pSyncNode->pRaftCfg->cfg));
char* printStr = "";
if (pCfgStr != NULL) {
printStr = pCfgStr;
}
if (userStrLen < 256) {
char logBuf[256 + 256];
......@@ -1321,7 +1326,7 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex,
pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
pSyncNode->changing, pCfgStr);
pSyncNode->changing, printStr);
} else {
snprintf(logBuf, sizeof(logBuf), "%s", str);
}
......@@ -1338,7 +1343,7 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex,
pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
pSyncNode->changing, pCfgStr);
pSyncNode->changing, printStr);
} else {
snprintf(s, len, "%s", str);
}
......@@ -1957,17 +1962,21 @@ SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
taosMemoryFree(pPreEntry);
return preTerm;
} else {
if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) {
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
if (snapshot.lastApplyIndex == preIndex) {
return snapshot.lastApplyTerm;
}
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
if (snapshot.lastApplyIndex == preIndex) {
return snapshot.lastApplyTerm;
}
}
}
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "sync node get pre term error, index:%ld", index);
syncNodeErrorLog(pSyncNode, logBuf);
} while (0);
return SYNC_TERM_INVALID;
}
......
......@@ -107,26 +107,30 @@ char *syncCfg2Str(SSyncCfg *pSyncCfg) {
}
char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg) {
int32_t len = 512;
char *s = taosMemoryMalloc(len);
memset(s, 0, len);
snprintf(s, len, "{replica-num:%d, my-index:%d, ", pSyncCfg->replicaNum, pSyncCfg->myIndex);
char *p = s + strlen(s);
for (int i = 0; i < pSyncCfg->replicaNum; ++i) {
/*
if (p + 128 + 32 > s + len) {
break;
if (pSyncCfg != NULL) {
int32_t len = 512;
char *s = taosMemoryMalloc(len);
memset(s, 0, len);
snprintf(s, len, "{replica-num:%d, my-index:%d, ", pSyncCfg->replicaNum, pSyncCfg->myIndex);
char *p = s + strlen(s);
for (int i = 0; i < pSyncCfg->replicaNum; ++i) {
/*
if (p + 128 + 32 > s + len) {
break;
}
*/
char buf[128 + 32];
snprintf(buf, sizeof(buf), "%s:%d, ", pSyncCfg->nodeInfo[i].nodeFqdn, pSyncCfg->nodeInfo[i].nodePort);
strncpy(p, buf, sizeof(buf));
p = s + strlen(s);
}
*/
char buf[128 + 32];
snprintf(buf, sizeof(buf), "%s:%d, ", pSyncCfg->nodeInfo[i].nodeFqdn, pSyncCfg->nodeInfo[i].nodePort);
strncpy(p, buf, sizeof(buf));
p = s + strlen(s);
strcpy(p - 2, "}");
return s;
}
strcpy(p - 2, "}");
return s;
return NULL;
}
int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg) {
......
......@@ -16,10 +16,10 @@
#include "syncRaftLog.h"
#include "syncRaftCfg.h"
#include "syncRaftStore.h"
#include "wal.h"
// refactor, log[0 .. n] ==> log[m .. n]
static int32_t raftLogSetBeginIndex(struct SSyncLogStore* pLogStore, SyncIndex beginIndex);
static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncIndex snapshotIndex);
// static int32_t raftLogSetBeginIndex(struct SSyncLogStore* pLogStore, SyncIndex beginIndex);
static SyncIndex raftLogBeginIndex(struct SSyncLogStore* pLogStore);
static SyncIndex raftLogEndIndex(struct SSyncLogStore* pLogStore);
static SyncIndex raftLogWriteIndex(struct SSyncLogStore* pLogStore);
......@@ -45,31 +45,43 @@ static int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncI
static SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore);
// refactor, log[0 .. n] ==> log[m .. n]
/*
static int32_t raftLogSetBeginIndex(struct SSyncLogStore* pLogStore, SyncIndex beginIndex) {
sTrace("raftLogSetBeginIndex beginIndex:%ld", beginIndex);
// if beginIndex == 0, donot need call this funciton
ASSERT(beginIndex > 0);
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
sTrace("vgId:%d, reset wal begin index:%ld", pData->pSyncNode->vgId, beginIndex);
pData->beginIndex = beginIndex;
walRestoreFromSnapshot(pWal, beginIndex - 1);
return 0;
}
*/
static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncIndex snapshotIndex) {
ASSERT(snapshotIndex >= 0);
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
walRestoreFromSnapshot(pWal, snapshotIndex);
return 0;
}
static SyncIndex raftLogBeginIndex(struct SSyncLogStore* pLogStore) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
return pData->beginIndex;
SyncIndex firstVer = walGetFirstVer(pWal);
return firstVer;
}
static SyncIndex raftLogEndIndex(struct SSyncLogStore* pLogStore) { return raftLogLastIndex(pLogStore); }
static bool raftLogIsEmpty(struct SSyncLogStore* pLogStore) {
SyncIndex beginIndex = raftLogBeginIndex(pLogStore);
SyncIndex endIndex = raftLogEndIndex(pLogStore);
return (endIndex < beginIndex);
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
return walIsEmpty(pWal);
}
static int32_t raftLogEntryCount(struct SSyncLogStore* pLogStore) {
......@@ -96,23 +108,8 @@ static SyncIndex raftLogLastIndex(struct SSyncLogStore* pLogStore) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
SyncIndex lastVer = walGetLastVer(pWal);
SyncIndex firstVer = walGetFirstVer(pWal);
if (lastVer < firstVer) {
// no record
lastIndex = -1;
} else {
if (firstVer >= 0) {
lastIndex = lastVer;
} else if (firstVer == -1) {
lastIndex = -1;
} else {
ASSERT(0);
}
}
return lastIndex;
return lastVer;
}
static SyncIndex raftLogWriteIndex(struct SSyncLogStore* pLogStore) {
......@@ -122,6 +119,26 @@ static SyncIndex raftLogWriteIndex(struct SSyncLogStore* pLogStore) {
return lastVer + 1;
}
static SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
if (walIsEmpty(pWal)) {
return 0;
} else {
SSyncRaftEntry* pLastEntry;
int32_t code = raftLogGetLastEntry(pLogStore, &pLastEntry);
ASSERT(code == 0);
ASSERT(pLastEntry != NULL);
SyncTerm lastTerm = pLastEntry->term;
taosMemoryFree(pLastEntry);
return lastTerm;
}
return 0;
}
/*
static SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore) {
SyncTerm lastTerm = 0;
if (raftLogEntryCount(pLogStore) == 0) {
......@@ -137,6 +154,7 @@ static SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore) {
}
return lastTerm;
}
*/
static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
SSyncLogStoreData* pData = pLogStore->data;
......@@ -160,8 +178,12 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
const char* errStr = tstrerror(err);
int32_t sysErr = errno;
const char* sysErrStr = strerror(errno);
sError("vgId:%d wal write error, index:%ld, err:%d %X, msg:%s, syserr:%d, sysmsg:%s", pData->pSyncNode->vgId,
pEntry->index, err, err, errStr, sysErr, sysErrStr);
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "wal write error, index:%ld, err:%d %X, msg:%s, syserr:%d, sysmsg:%s",
pEntry->index, err, err, errStr, sysErr, sysErrStr);
syncNodeErrorLog(pData->pSyncNode, logBuf);
ASSERT(0);
}
......@@ -229,7 +251,8 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index,
*ppEntry = NULL;
SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
// SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
SWalReadHandle* pWalHandle = pData->pWalHandle;
if (pWalHandle == NULL) {
return -1;
}
......@@ -240,12 +263,23 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index,
const char* errStr = tstrerror(err);
int32_t sysErr = errno;
const char* sysErrStr = strerror(errno);
sError("vgId:%d wal read error, index:%ld, err:%d %X, msg:%s, syserr:%d, sysmsg:%s", pData->pSyncNode->vgId, index,
err, err, errStr, sysErr, sysErrStr);
int32_t saveErr = terrno;
walCloseReadHandle(pWalHandle);
terrno = saveErr;
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "wal read error, index:%ld, err:%d %X, msg:%s, syserr:%d, sysmsg:%s", index, err,
err, errStr, sysErr, sysErrStr);
if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) {
syncNodeEventLog(pData->pSyncNode, logBuf);
} else {
syncNodeErrorLog(pData->pSyncNode, logBuf);
}
} while (0);
/*
int32_t saveErr = terrno;
walCloseReadHandle(pWalHandle);
terrno = saveErr;
*/
return code;
}
......@@ -261,9 +295,11 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index,
ASSERT((*ppEntry)->dataLen == pWalHandle->pHead->head.bodyLen);
memcpy((*ppEntry)->data, pWalHandle->pHead->head.body, pWalHandle->pHead->head.bodyLen);
int32_t saveErr = terrno;
walCloseReadHandle(pWalHandle);
terrno = saveErr;
/*
int32_t saveErr = terrno;
walCloseReadHandle(pWalHandle);
terrno = saveErr;
*/
return code;
}
......@@ -285,6 +321,25 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn
return code;
}
static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** ppLastEntry) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
ASSERT(ppLastEntry != NULL);
*ppLastEntry = NULL;
if (walIsEmpty(pWal)) {
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
return -1;
} else {
SyncIndex lastIndex = raftLogLastIndex(pLogStore);
int32_t code = raftLogGetEntry(pLogStore, lastIndex, ppLastEntry);
return code;
}
return -1;
}
/*
static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** ppLastEntry) {
*ppLastEntry = NULL;
if (raftLogEntryCount(pLogStore) == 0) {
......@@ -294,6 +349,7 @@ static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** pp
int32_t code = raftLogGetEntry(pLogStore, lastIndex, ppLastEntry);
return code;
}
*/
//-------------------------------
SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
......@@ -306,16 +362,22 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
SSyncLogStoreData* pData = pLogStore->data;
pData->pSyncNode = pSyncNode;
pData->pWal = pSyncNode->pWal;
ASSERT(pData->pWal != NULL);
SyncIndex firstVer = walGetFirstVer(pData->pWal);
SyncIndex lastVer = walGetLastVer(pData->pWal);
if (firstVer >= 0) {
pData->beginIndex = firstVer;
} else if (firstVer == -1) {
pData->beginIndex = lastVer + 1;
} else {
ASSERT(0);
}
pData->pWalHandle = walOpenReadHandle(pData->pWal);
ASSERT(pData->pWalHandle != NULL);
/*
SyncIndex firstVer = walGetFirstVer(pData->pWal);
SyncIndex lastVer = walGetLastVer(pData->pWal);
if (firstVer >= 0) {
pData->beginIndex = firstVer;
} else if (firstVer == -1) {
pData->beginIndex = lastVer + 1;
} else {
ASSERT(0);
}
*/
pLogStore->appendEntry = logStoreAppendEntry;
pLogStore->getEntry = logStoreGetEntry;
......@@ -325,7 +387,8 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
pLogStore->updateCommitIndex = logStoreUpdateCommitIndex;
pLogStore->getCommitIndex = logStoreGetCommitIndex;
pLogStore->syncLogSetBeginIndex = raftLogSetBeginIndex;
// pLogStore->syncLogSetBeginIndex = raftLogSetBeginIndex;
pLogStore->syncLogRestoreFromSnapshot = raftLogRestoreFromSnapshot;
pLogStore->syncLogBeginIndex = raftLogBeginIndex;
pLogStore->syncLogEndIndex = raftLogEndIndex;
pLogStore->syncLogIsEmpty = raftLogIsEmpty;
......@@ -344,6 +407,11 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
void logStoreDestory(SSyncLogStore* pLogStore) {
if (pLogStore != NULL) {
SSyncLogStoreData* pData = pLogStore->data;
if (pData->pWalHandle != NULL) {
walCloseReadHandle(pData->pWalHandle);
}
taosMemoryFree(pLogStore->data);
taosMemoryFree(pLogStore);
}
......@@ -368,8 +436,11 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
const char* errStr = tstrerror(err);
int32_t sysErr = errno;
const char* sysErrStr = strerror(errno);
sError("vgId:%d wal write error, index:%ld, err:%d %X, msg:%s, syserr:%d, sysmsg:%s", pData->pSyncNode->vgId,
pEntry->index, err, err, errStr, sysErr, sysErrStr);
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "wal write error, index:%ld, err:%d %X, msg:%s, syserr:%d, sysmsg:%s",
pEntry->index, err, err, errStr, sysErr, sysErrStr);
syncNodeErrorLog(pData->pSyncNode, logBuf);
ASSERT(0);
}
......@@ -389,7 +460,8 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
SWal* pWal = pData->pWal;
if (index >= SYNC_INDEX_BEGIN && index <= logStoreLastIndex(pLogStore)) {
SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
// SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
SWalReadHandle* pWalHandle = pData->pWalHandle;
ASSERT(pWalHandle != NULL);
int32_t code = walReadWithHandle(pWalHandle, index);
......@@ -398,12 +470,20 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
const char* errStr = tstrerror(err);
int32_t sysErr = errno;
const char* sysErrStr = strerror(errno);
sError("vgId:%d wal read error, index:%ld, err:%d %X, msg:%s, syserr:%d, sysmsg:%s", pData->pSyncNode->vgId,
index, err, err, errStr, sysErr, sysErrStr);
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "wal read error, index:%ld, err:%d %X, msg:%s, syserr:%d, sysmsg:%s", index,
err, err, errStr, sysErr, sysErrStr);
if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) {
syncNodeEventLog(pData->pSyncNode, logBuf);
} else {
syncNodeErrorLog(pData->pSyncNode, logBuf);
}
} while (0);
ASSERT(0);
}
// ASSERT(walReadWithHandle(pWalHandle, index) == 0);
SSyncRaftEntry* pEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen);
ASSERT(pEntry != NULL);
......@@ -417,9 +497,11 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
ASSERT(pEntry->dataLen == pWalHandle->pHead->head.bodyLen);
memcpy(pEntry->data, pWalHandle->pHead->head.body, pWalHandle->pHead->head.bodyLen);
int32_t saveErr = terrno;
walCloseReadHandle(pWalHandle);
terrno = saveErr;
/*
int32_t saveErr = terrno;
walCloseReadHandle(pWalHandle);
terrno = saveErr;
*/
return pEntry;
......@@ -498,6 +580,7 @@ SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore) {
return pEntry;
}
/*
cJSON* logStore2Json(SSyncLogStore* pLogStore) {
char u64buf[128] = {0};
SSyncLogStoreData* pData = (SSyncLogStoreData*)pLogStore->data;
......@@ -544,6 +627,57 @@ cJSON* logStore2Json(SSyncLogStore* pLogStore) {
cJSON_AddItemToObject(pJson, "SSyncLogStore", pRoot);
return pJson;
}
*/
cJSON* logStore2Json(SSyncLogStore* pLogStore) {
char u64buf[128] = {0};
SSyncLogStoreData* pData = (SSyncLogStoreData*)pLogStore->data;
cJSON* pRoot = cJSON_CreateObject();
if (pData != NULL && pData->pWal != NULL) {
snprintf(u64buf, sizeof(u64buf), "%p", pData->pSyncNode);
cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
snprintf(u64buf, sizeof(u64buf), "%p", pData->pWal);
cJSON_AddStringToObject(pRoot, "pWal", u64buf);
SyncIndex beginIndex = raftLogBeginIndex(pLogStore);
snprintf(u64buf, sizeof(u64buf), "%ld", beginIndex);
cJSON_AddStringToObject(pRoot, "beginIndex", u64buf);
SyncIndex endIndex = raftLogEndIndex(pLogStore);
snprintf(u64buf, sizeof(u64buf), "%ld", endIndex);
cJSON_AddStringToObject(pRoot, "endIndex", u64buf);
int32_t count = raftLogEntryCount(pLogStore);
cJSON_AddNumberToObject(pRoot, "entryCount", count);
snprintf(u64buf, sizeof(u64buf), "%ld", raftLogWriteIndex(pLogStore));
cJSON_AddStringToObject(pRoot, "WriteIndex", u64buf);
snprintf(u64buf, sizeof(u64buf), "%d", raftLogIsEmpty(pLogStore));
cJSON_AddStringToObject(pRoot, "IsEmpty", u64buf);
snprintf(u64buf, sizeof(u64buf), "%ld", raftLogLastIndex(pLogStore));
cJSON_AddStringToObject(pRoot, "LastIndex", u64buf);
snprintf(u64buf, sizeof(u64buf), "%lu", raftLogLastTerm(pLogStore));
cJSON_AddStringToObject(pRoot, "LastTerm", u64buf);
cJSON* pEntries = cJSON_CreateArray();
cJSON_AddItemToObject(pRoot, "pEntries", pEntries);
if (!raftLogIsEmpty(pLogStore)) {
for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
SSyncRaftEntry* pEntry = logStoreGetEntry(pLogStore, i);
cJSON_AddItemToArray(pEntries, syncEntry2Json(pEntry));
syncEntryDestory(pEntry);
}
}
}
cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SSyncLogStore", pRoot);
return pJson;
}
char* logStore2Str(SSyncLogStore* pLogStore) {
cJSON* pJson = logStore2Json(pLogStore);
......@@ -563,7 +697,8 @@ cJSON* logStoreSimple2Json(SSyncLogStore* pLogStore) {
snprintf(u64buf, sizeof(u64buf), "%p", pData->pWal);
cJSON_AddStringToObject(pRoot, "pWal", u64buf);
snprintf(u64buf, sizeof(u64buf), "%ld", pData->beginIndex);
SyncIndex beginIndex = raftLogBeginIndex(pLogStore);
snprintf(u64buf, sizeof(u64buf), "%ld", beginIndex);
cJSON_AddStringToObject(pRoot, "beginIndex", u64buf);
SyncIndex endIndex = raftLogEndIndex(pLogStore);
......
......@@ -219,6 +219,17 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode) {
int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg) {
int32_t ret = 0;
do {
char host[128];
uint16_t port;
syncUtilU642Addr(destRaftId->addr, host, sizeof(host), &port);
sDebug(
"vgId:%d, send sync-append-entries to %s:%d, term:%lu, pre-index:%ld, pre-term:%lu, pterm:%lu, commit:%ld, "
"datalen:%d",
pSyncNode->vgId, host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm,
pMsg->commitIndex, pMsg->dataLen);
} while (0);
SRpcMsg rpcMsg;
syncAppendEntries2RpcMsg(pMsg, &rpcMsg);
syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg);
......
......@@ -545,7 +545,8 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
pReceiver->pSyncNode->commitIndex = pReceiver->snapshot.lastApplyIndex;
}
pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, pMsg->lastIndex + 1);
// pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, pMsg->lastIndex + 1);
pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pSyncNode->pLogStore, pMsg->lastIndex);
// maybe update lastconfig
if (pMsg->lastConfigIndex >= SYNC_INDEX_BEGIN) {
......
......@@ -113,7 +113,8 @@ void test2() {
pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore);
pSyncNode->pLogStore = pLogStore;
pLogStore->syncLogSetBeginIndex(pLogStore, 5);
//pLogStore->syncLogSetBeginIndex(pLogStore, 5);
pLogStore->syncLogRestoreFromSnapshot(pLogStore, 4);
logStoreLog2((char*)"\n\n\ntest2 ----- ", pLogStore);
if (gAssert) {
......@@ -228,7 +229,8 @@ void test4() {
assert(pLogStore);
pSyncNode->pLogStore = pLogStore;
logStoreLog2((char*)"\n\n\ntest4 ----- ", pLogStore);
pLogStore->syncLogSetBeginIndex(pLogStore, 5);
//pLogStore->syncLogSetBeginIndex(pLogStore, 5);
pLogStore->syncLogRestoreFromSnapshot(pLogStore, 4);
for (int i = 5; i <= 9; ++i) {
int32_t dataLen = 10;
......@@ -289,7 +291,8 @@ void test5() {
assert(pLogStore);
pSyncNode->pLogStore = pLogStore;
logStoreLog2((char*)"\n\n\ntest5 ----- ", pLogStore);
pLogStore->syncLogSetBeginIndex(pLogStore, 5);
//pLogStore->syncLogSetBeginIndex(pLogStore, 5);
pLogStore->syncLogRestoreFromSnapshot(pLogStore, 4);
for (int i = 5; i <= 9; ++i) {
int32_t dataLen = 10;
......@@ -363,7 +366,8 @@ void test6() {
assert(pLogStore);
pSyncNode->pLogStore = pLogStore;
logStoreLog2((char*)"\n\n\ntest6 ----- ", pLogStore);
pLogStore->syncLogSetBeginIndex(pLogStore, 5);
// pLogStore->syncLogSetBeginIndex(pLogStore, 5);
pLogStore->syncLogRestoreFromSnapshot(pLogStore, 4);
for (int i = 5; i <= 9; ++i) {
int32_t dataLen = 10;
......@@ -405,14 +409,32 @@ void test6() {
assert(pLogStore->syncLogLastTerm(pLogStore) == 0);
}
do {
SyncIndex firstVer = walGetFirstVer(pWal);
SyncIndex lastVer = walGetLastVer(pWal);
bool isEmpty = walIsEmpty(pWal);
printf("before -------- firstVer:%ld lastVer:%ld isEmpty:%d \n", firstVer, lastVer, isEmpty);
} while (0);
logStoreDestory(pLogStore);
cleanup();
// restart
init();
pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore);
pSyncNode->pLogStore = pLogStore;
do {
SyncIndex firstVer = walGetFirstVer(pWal);
SyncIndex lastVer = walGetLastVer(pWal);
bool isEmpty = walIsEmpty(pWal);
printf("after -------- firstVer:%ld lastVer:%ld isEmpty:%d \n", firstVer, lastVer, isEmpty);
} while (0);
logStoreLog2((char*)"\n\n\ntest6 restart ----- ", pLogStore);
if (gAssert) {
......@@ -432,17 +454,20 @@ void test6() {
int main(int argc, char** argv) {
tsAsyncLog = 0;
sDebugFlag = DEBUG_TRACE + DEBUG_INFO + DEBUG_SCREEN + DEBUG_FILE;
gRaftDetailLog = true;
if (argc == 2) {
gAssert = atoi(argv[1]);
}
sTrace("gAssert : %d", gAssert);
/*
test1();
test2();
test3();
test4();
test5();
*/
test6();
return 0;
......
......@@ -312,7 +312,8 @@ void test5() {
pSyncNode->pLogStore = pLogStore;
logStoreLog2((char*)"\n\n\ntest5 ----- ", pLogStore);
pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, 6);
//pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, 6);
pLogStore->syncLogRestoreFromSnapshot(pSyncNode->pLogStore, 5);
for (int i = 6; i <= 10; ++i) {
int32_t dataLen = 10;
SSyncRaftEntry* pEntry = syncEntryBuild(dataLen);
......@@ -372,6 +373,7 @@ void test5() {
int main(int argc, char** argv) {
tsAsyncLog = 0;
sDebugFlag = DEBUG_TRACE + DEBUG_INFO + DEBUG_SCREEN + DEBUG_FILE;
gRaftDetailLog = true;
if (argc == 2) {
gAssert = atoi(argv[1]);
......
......@@ -272,14 +272,23 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
}
}
if (ver > pRead->pWal->vers.lastVer || ver < pRead->pWal->vers.firstVer) {
wError("invalid version: % " PRId64 ", first ver %ld, last ver %ld", ver, pRead->pWal->vers.firstVer,
pRead->pWal->vers.lastVer);
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
return -1;
}
ASSERT(taosValidFile(pRead->pReadLogTFile) == true);
code = taosReadFile(pRead->pReadLogTFile, pRead->pHead, sizeof(SWalHead));
if (code != sizeof(SWalHead)) {
if (code < 0)
terrno = TAOS_SYSTEM_ERROR(errno);
else
else {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(0);
}
return -1;
}
......@@ -304,8 +313,10 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
pRead->pHead->head.bodyLen) {
if (code < 0)
terrno = TAOS_SYSTEM_ERROR(errno);
else
else {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(0);
}
return -1;
}
......
......@@ -150,6 +150,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
ASSERT(code == 0);
if (code != 0) {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(0);
return -1;
}
if (head.head.version != ver) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册