diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 7bd8d75dd14b7878450283c2b698bda90af2e47c..675ffca7346cce67446d171337d263253963f6e0 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -275,6 +275,8 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt SyncIndex index = pEntry->index; SyncIndex prevIndex = pEntry->index - 1; SyncTerm lastMatchTerm = syncLogBufferGetLastMatchTerm(pBuf); + SSyncRaftEntry* pExist = NULL; + bool inBuf = true; if (index <= pBuf->commitIndex) { sTrace("vgId:%d, already committed. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64 " %" PRId64 @@ -306,10 +308,9 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt } // check current in buffer - SSyncRaftEntry* pExist = pBuf->entries[index % pBuf->size].pItem; + pExist = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf); if (pExist != NULL) { ASSERT(pEntry->index == pExist->index); - if (pEntry->term != pExist->term) { (void)syncLogBufferRollback(pBuf, pNode, index); } else { @@ -317,14 +318,15 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); - SyncTerm existPrevTerm = pBuf->entries[index % pBuf->size].prevLogTerm; - ASSERT(pEntry->term == pExist->term && prevTerm == existPrevTerm); + SyncTerm existPrevTerm = syncLogReplMgrGetPrevLogTerm(NULL, pNode, index); + ASSERT(pEntry->term == pExist->term && (pEntry->index > pBuf->matchIndex || prevTerm == existPrevTerm)); ret = 0; goto _out; } } // update + ASSERT(pBuf->entries[index % pBuf->size].pItem == NULL); SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = prevIndex, .prevLogTerm = prevTerm}; pEntry = NULL; pBuf->entries[index % pBuf->size] = tmp; @@ -337,6 +339,10 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt _out: syncEntryDestroy(pEntry); + if (!inBuf) { + syncEntryDestroy(pExist); + pExist = NULL; + } syncLogBufferValidate(pBuf); taosThreadMutexUnlock(&pBuf->mutex); return ret; @@ -1003,6 +1009,16 @@ int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore); ASSERT(toIndex == lastVer + 1); + // refill buffer on need + if (toIndex <= pBuf->startIndex) { + int32_t ret = syncLogBufferInitWithoutLock(pBuf, pNode); + if (ret < 0) { + sError("vgId:%d, failed to refill sync log buffer since %s", pNode->vgId, terrstr()); + return -1; + } + } + + ASSERT(pBuf->endIndex == toIndex); syncLogBufferValidate(pBuf); return 0; }