diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index f836cd76acb25e4aa93e7e47bcc4176ddf9788ca..a0f02d96f91913bbe127b992052ecd03a2008b61 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -401,7 +401,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsNumOfVnodeWriteThreads = TMAX(tsNumOfVnodeWriteThreads, 1); if (cfgAddInt32(pCfg, "numOfVnodeWriteThreads", tsNumOfVnodeWriteThreads, 1, 1024, 0) != 0) return -1; - tsNumOfVnodeSyncThreads = tsNumOfCores; + // tsNumOfVnodeSyncThreads = tsNumOfCores; + tsNumOfVnodeSyncThreads = 32; tsNumOfVnodeSyncThreads = TMAX(tsNumOfVnodeSyncThreads, 1); if (cfgAddInt32(pCfg, "numOfVnodeSyncThreads", tsNumOfVnodeSyncThreads, 1, 1024, 0) != 0) return -1; diff --git a/source/libs/sync/inc/syncRaftLog.h b/source/libs/sync/inc/syncRaftLog.h index 65ec77e38ff10ff77de1d4000515439ad7844ef9..ff59189a9d6a96566e3246a5ed8fa25ca819e440 100644 --- a/source/libs/sync/inc/syncRaftLog.h +++ b/source/libs/sync/inc/syncRaftLog.h @@ -47,6 +47,8 @@ char* logStoreSimple2Str(SSyncLogStore* pLogStore); SyncIndex logStoreFirstIndex(SSyncLogStore* pLogStore); +SyncIndex logStoreWalCommitVer(SSyncLogStore* pLogStore); + // for debug void logStorePrint(SSyncLogStore* pLogStore); void logStorePrint2(char* s, SSyncLogStore* pLogStore); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index f31f3dd1aea037b36846cf0fda55a564220e12fb..4f93d8197dc801ae86619858b15d9055059565eb 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -357,16 +357,14 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) { code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin); ASSERT(code == 0); - char eventLog[128]; - snprintf(eventLog, sizeof(eventLog), "log truncate, from %" PRId64 " to %" PRId64, delBegin, delEnd); - syncNodeEventLog(ths, eventLog); - logStoreSimpleLog2("after syncNodeMakeLogSame", ths->pLogStore); - return code; } +// if FromIndex > walCommitVer, return 0 +// else return num of pass entries static int32_t syncNodeDoMakeLogSame(SSyncNode* ths, SyncIndex FromIndex) { - int32_t code; + int32_t code = 0; + int32_t pass = 0; SyncIndex delBegin = FromIndex; SyncIndex delEnd = ths->pLogStore->syncLogLastIndex(ths->pLogStore); @@ -398,16 +396,31 @@ static int32_t syncNodeDoMakeLogSame(SSyncNode* ths, SyncIndex FromIndex) { } } + // update delete begin + SyncIndex walCommitVer = logStoreWalCommitVer(ths->pLogStore); + + if (delBegin <= walCommitVer) { + delBegin = walCommitVer + 1; + pass = walCommitVer - delBegin + 1; + + do { + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "update delete begin to %ld", delBegin); + syncNodeEventLog(ths, logBuf); + } while (0); + } + // delete confict entries code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin); ASSERT(code == 0); - char eventLog[128]; - snprintf(eventLog, sizeof(eventLog), "log truncate, from %" PRId64 " to %" PRId64, delBegin, delEnd); - syncNodeEventLog(ths, eventLog); - logStoreSimpleLog2("after syncNodeMakeLogSame", ths->pLogStore); + do { + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "make log same from:%ld, delbegin:%ld, pass:%d", FromIndex, delBegin, pass); + syncNodeEventLog(ths, logBuf); + } while (0); - return code; + return pass; } int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry, int32_t code) { @@ -543,31 +556,34 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc SOffsetAndContLen* metaTableArr = syncAppendEntriesBatchMetaTableArray(pMsg); if (hasAppendEntries && pMsg->prevLogIndex == ths->commitIndex) { - // make log same - do { - SyncIndex logLastIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore); - bool hasExtraEntries = logLastIndex > pMsg->prevLogIndex; + int32_t pass = 0; + SyncIndex logLastIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore); + bool hasExtraEntries = logLastIndex > pMsg->prevLogIndex; - if (hasExtraEntries) { - // make log same, rollback deleted entries - code = syncNodeDoMakeLogSame(ths, pMsg->prevLogIndex + 1); - ASSERT(code == 0); - } - - } while (0); + // make log same + if (hasExtraEntries) { + // make log same, rollback deleted entries + pass = syncNodeDoMakeLogSame(ths, pMsg->prevLogIndex + 1); + ASSERT(pass >= 0); + } // append entry batch - for (int32_t i = 0; i < pMsg->dataCount; ++i) { - SSyncRaftEntry* pAppendEntry = (SSyncRaftEntry*)(pMsg->data + metaTableArr[i].offset); - code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); - if (code != 0) { - return -1; - } + if (pass == 0) { + // assert! no batch + ASSERT(pMsg->dataCount <= 1); + + for (int32_t i = 0; i < pMsg->dataCount; ++i) { + SSyncRaftEntry* pAppendEntry = (SSyncRaftEntry*)(pMsg->data + metaTableArr[i].offset); + code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); + if (code != 0) { + return -1; + } - code = syncNodePreCommit(ths, pAppendEntry, 0); - ASSERT(code == 0); + code = syncNodePreCommit(ths, pAppendEntry, 0); + ASSERT(code == 0); - // syncEntryDestory(pAppendEntry); + // syncEntryDestory(pAppendEntry); + } } // fsync once @@ -670,25 +686,33 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc syncLogRecvAppendEntriesBatch(ths, pMsg, "really match"); + int32_t pass = 0; + if (hasExtraEntries) { // make log same, rollback deleted entries - code = syncNodeDoMakeLogSame(ths, pMsg->prevLogIndex + 1); - ASSERT(code == 0); + pass = syncNodeDoMakeLogSame(ths, pMsg->prevLogIndex + 1); + ASSERT(pass >= 0); } if (hasAppendEntries) { // append entry batch - for (int32_t i = 0; i < pMsg->dataCount; ++i) { - SSyncRaftEntry* pAppendEntry = (SSyncRaftEntry*)(pMsg->data + metaTableArr[i].offset); - code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); - if (code != 0) { - return -1; - } + if (pass == 0) { + // assert! no batch + ASSERT(pMsg->dataCount <= 1); + + // append entry batch + for (int32_t i = 0; i < pMsg->dataCount; ++i) { + SSyncRaftEntry* pAppendEntry = (SSyncRaftEntry*)(pMsg->data + metaTableArr[i].offset); + code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); + if (code != 0) { + return -1; + } - code = syncNodePreCommit(ths, pAppendEntry, 0); - ASSERT(code == 0); + code = syncNodePreCommit(ths, pAppendEntry, 0); + ASSERT(code == 0); - // syncEntryDestory(pAppendEntry); + // syncEntryDestory(pAppendEntry); + } } // fsync once diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index a603cfff2762bbd4fea088c9a4ad120d0471fce0..3a94ed9713ba2b12e2ce766b3dfd9e615b309d9f 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -92,6 +92,12 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { } } + // advance commit index as large as possible + SyncIndex walCommitVer = logStoreWalCommitVer(pSyncNode->pLogStore); + if (walCommitVer > newCommitIndex) { + newCommitIndex = walCommitVer; + } + // maybe execute fsm if (newCommitIndex > pSyncNode->commitIndex) { SyncIndex beginIndex = pSyncNode->commitIndex + 1; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 78004c0ad601b403ce1046ae41ac6ff995c6e7f9..c17d91182e0f90cbec32766373da5958aff6b31b 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2409,6 +2409,9 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { SSyncNode* pSyncNode = (SSyncNode*)param; + + syncNodeEventLog(pSyncNode, "eq hb timer"); + if (pSyncNode->replicaNum > 1) { if (atomic_load_64(&pSyncNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pSyncNode->heartbeatTimerLogicClock)) { diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index b575e40d86884c9fdd688db03e4cc8492e9ea0d3..0649e064e45391cfe9082c24264a33b762d1a279 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -305,10 +305,18 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, return code; } +// truncate semantic static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIndex) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; - int32_t code = walRollback(pWal, fromIndex); + + // need not truncate + SyncIndex wallastVer = walGetLastVer(pWal); + if (fromIndex > wallastVer) { + return 0; + } + + int32_t code = walRollback(pWal, fromIndex); if (code != 0) { int32_t err = terrno; const char* errStr = tstrerror(err); @@ -323,7 +331,7 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn // event log do { char logBuf[128]; - snprintf(logBuf, sizeof(logBuf), "wal truncate, from-index:%" PRId64, fromIndex); + snprintf(logBuf, sizeof(logBuf), "log truncate, from-index:%" PRId64, fromIndex); syncNodeEventLog(pData->pSyncNode, logBuf); } while (0); @@ -637,6 +645,12 @@ SyncIndex logStoreFirstIndex(SSyncLogStore* pLogStore) { return walGetFirstVer(pWal); } +SyncIndex logStoreWalCommitVer(SSyncLogStore* pLogStore) { + SSyncLogStoreData* pData = pLogStore->data; + SWal* pWal = pData->pWal; + return walGetCommittedVer(pWal); +} + // for debug ----------------- void logStorePrint(SSyncLogStore* pLogStore) { char* serialized = logStore2Str(pLogStore);