未验证 提交 363a4f38 编写于 作者: L Li Minghao 提交者: GitHub

Merge pull request #15821 from taosdata/feature/3.0_mhli

refactor(sync): make leader life longer
......@@ -401,7 +401,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfVnodeWriteThreads = TMAX(tsNumOfVnodeWriteThreads, 1);
if (cfgAddInt32(pCfg, "numOfVnodeWriteThreads", tsNumOfVnodeWriteThreads, 1, 1024, 0) != 0) return -1;
tsNumOfVnodeSyncThreads = tsNumOfCores;
// tsNumOfVnodeSyncThreads = tsNumOfCores;
tsNumOfVnodeSyncThreads = 32;
tsNumOfVnodeSyncThreads = TMAX(tsNumOfVnodeSyncThreads, 1);
if (cfgAddInt32(pCfg, "numOfVnodeSyncThreads", tsNumOfVnodeSyncThreads, 1, 1024, 0) != 0) return -1;
......
......@@ -47,6 +47,8 @@ char* logStoreSimple2Str(SSyncLogStore* pLogStore);
SyncIndex logStoreFirstIndex(SSyncLogStore* pLogStore);
SyncIndex logStoreWalCommitVer(SSyncLogStore* pLogStore);
// for debug
void logStorePrint(SSyncLogStore* pLogStore);
void logStorePrint2(char* s, SSyncLogStore* pLogStore);
......
......@@ -357,16 +357,14 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) {
code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin);
ASSERT(code == 0);
char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "log truncate, from %" PRId64 " to %" PRId64, delBegin, delEnd);
syncNodeEventLog(ths, eventLog);
logStoreSimpleLog2("after syncNodeMakeLogSame", ths->pLogStore);
return code;
}
// if FromIndex > walCommitVer, return 0
// else return num of pass entries
static int32_t syncNodeDoMakeLogSame(SSyncNode* ths, SyncIndex FromIndex) {
int32_t code;
int32_t code = 0;
int32_t pass = 0;
SyncIndex delBegin = FromIndex;
SyncIndex delEnd = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
......@@ -398,16 +396,31 @@ static int32_t syncNodeDoMakeLogSame(SSyncNode* ths, SyncIndex FromIndex) {
}
}
// update delete begin
SyncIndex walCommitVer = logStoreWalCommitVer(ths->pLogStore);
if (delBegin <= walCommitVer) {
delBegin = walCommitVer + 1;
pass = walCommitVer - delBegin + 1;
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "update delete begin to %ld", delBegin);
syncNodeEventLog(ths, logBuf);
} while (0);
}
// delete confict entries
code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin);
ASSERT(code == 0);
char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "log truncate, from %" PRId64 " to %" PRId64, delBegin, delEnd);
syncNodeEventLog(ths, eventLog);
logStoreSimpleLog2("after syncNodeMakeLogSame", ths->pLogStore);
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "make log same from:%ld, delbegin:%ld, pass:%d", FromIndex, delBegin, pass);
syncNodeEventLog(ths, logBuf);
} while (0);
return code;
return pass;
}
int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry, int32_t code) {
......@@ -543,31 +556,34 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
SOffsetAndContLen* metaTableArr = syncAppendEntriesBatchMetaTableArray(pMsg);
if (hasAppendEntries && pMsg->prevLogIndex == ths->commitIndex) {
// make log same
do {
SyncIndex logLastIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
bool hasExtraEntries = logLastIndex > pMsg->prevLogIndex;
int32_t pass = 0;
SyncIndex logLastIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
bool hasExtraEntries = logLastIndex > pMsg->prevLogIndex;
if (hasExtraEntries) {
// make log same, rollback deleted entries
code = syncNodeDoMakeLogSame(ths, pMsg->prevLogIndex + 1);
ASSERT(code == 0);
}
} while (0);
// make log same
if (hasExtraEntries) {
// make log same, rollback deleted entries
pass = syncNodeDoMakeLogSame(ths, pMsg->prevLogIndex + 1);
ASSERT(pass >= 0);
}
// append entry batch
for (int32_t i = 0; i < pMsg->dataCount; ++i) {
SSyncRaftEntry* pAppendEntry = (SSyncRaftEntry*)(pMsg->data + metaTableArr[i].offset);
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
if (code != 0) {
return -1;
}
if (pass == 0) {
// assert! no batch
ASSERT(pMsg->dataCount <= 1);
for (int32_t i = 0; i < pMsg->dataCount; ++i) {
SSyncRaftEntry* pAppendEntry = (SSyncRaftEntry*)(pMsg->data + metaTableArr[i].offset);
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
if (code != 0) {
return -1;
}
code = syncNodePreCommit(ths, pAppendEntry, 0);
ASSERT(code == 0);
code = syncNodePreCommit(ths, pAppendEntry, 0);
ASSERT(code == 0);
// syncEntryDestory(pAppendEntry);
// syncEntryDestory(pAppendEntry);
}
}
// fsync once
......@@ -670,25 +686,33 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
syncLogRecvAppendEntriesBatch(ths, pMsg, "really match");
int32_t pass = 0;
if (hasExtraEntries) {
// make log same, rollback deleted entries
code = syncNodeDoMakeLogSame(ths, pMsg->prevLogIndex + 1);
ASSERT(code == 0);
pass = syncNodeDoMakeLogSame(ths, pMsg->prevLogIndex + 1);
ASSERT(pass >= 0);
}
if (hasAppendEntries) {
// append entry batch
for (int32_t i = 0; i < pMsg->dataCount; ++i) {
SSyncRaftEntry* pAppendEntry = (SSyncRaftEntry*)(pMsg->data + metaTableArr[i].offset);
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
if (code != 0) {
return -1;
}
if (pass == 0) {
// assert! no batch
ASSERT(pMsg->dataCount <= 1);
// append entry batch
for (int32_t i = 0; i < pMsg->dataCount; ++i) {
SSyncRaftEntry* pAppendEntry = (SSyncRaftEntry*)(pMsg->data + metaTableArr[i].offset);
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
if (code != 0) {
return -1;
}
code = syncNodePreCommit(ths, pAppendEntry, 0);
ASSERT(code == 0);
code = syncNodePreCommit(ths, pAppendEntry, 0);
ASSERT(code == 0);
// syncEntryDestory(pAppendEntry);
// syncEntryDestory(pAppendEntry);
}
}
// fsync once
......
......@@ -92,6 +92,12 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
}
}
// advance commit index as large as possible
SyncIndex walCommitVer = logStoreWalCommitVer(pSyncNode->pLogStore);
if (walCommitVer > newCommitIndex) {
newCommitIndex = walCommitVer;
}
// maybe execute fsm
if (newCommitIndex > pSyncNode->commitIndex) {
SyncIndex beginIndex = pSyncNode->commitIndex + 1;
......
......@@ -2409,6 +2409,9 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
SSyncNode* pSyncNode = (SSyncNode*)param;
syncNodeEventLog(pSyncNode, "eq hb timer");
if (pSyncNode->replicaNum > 1) {
if (atomic_load_64(&pSyncNode->heartbeatTimerLogicClockUser) <=
atomic_load_64(&pSyncNode->heartbeatTimerLogicClock)) {
......
......@@ -305,10 +305,18 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index,
return code;
}
// truncate semantic
static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIndex) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
int32_t code = walRollback(pWal, fromIndex);
// need not truncate
SyncIndex wallastVer = walGetLastVer(pWal);
if (fromIndex > wallastVer) {
return 0;
}
int32_t code = walRollback(pWal, fromIndex);
if (code != 0) {
int32_t err = terrno;
const char* errStr = tstrerror(err);
......@@ -323,7 +331,7 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn
// event log
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "wal truncate, from-index:%" PRId64, fromIndex);
snprintf(logBuf, sizeof(logBuf), "log truncate, from-index:%" PRId64, fromIndex);
syncNodeEventLog(pData->pSyncNode, logBuf);
} while (0);
......@@ -637,6 +645,12 @@ SyncIndex logStoreFirstIndex(SSyncLogStore* pLogStore) {
return walGetFirstVer(pWal);
}
SyncIndex logStoreWalCommitVer(SSyncLogStore* pLogStore) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
return walGetCommittedVer(pWal);
}
// for debug -----------------
void logStorePrint(SSyncLogStore* pLogStore) {
char* serialized = logStore2Str(pLogStore);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册