提交 a2181081 编写于 作者: B Benguang Zhao

fix: allow to rollback sync log buffer beyond startIndex with refill

上级 75b70718
...@@ -275,6 +275,8 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt ...@@ -275,6 +275,8 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
SyncIndex index = pEntry->index; SyncIndex index = pEntry->index;
SyncIndex prevIndex = pEntry->index - 1; SyncIndex prevIndex = pEntry->index - 1;
SyncTerm lastMatchTerm = syncLogBufferGetLastMatchTerm(pBuf); SyncTerm lastMatchTerm = syncLogBufferGetLastMatchTerm(pBuf);
SSyncRaftEntry* pExist = NULL;
bool inBuf = true;
if (index <= pBuf->commitIndex) { if (index <= pBuf->commitIndex) {
sTrace("vgId:%d, already committed. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64 " %" PRId64 sTrace("vgId:%d, already committed. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64 " %" PRId64
...@@ -306,10 +308,9 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt ...@@ -306,10 +308,9 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
} }
// check current in buffer // check current in buffer
SSyncRaftEntry* pExist = pBuf->entries[index % pBuf->size].pItem; pExist = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf);
if (pExist != NULL) { if (pExist != NULL) {
ASSERT(pEntry->index == pExist->index); ASSERT(pEntry->index == pExist->index);
if (pEntry->term != pExist->term) { if (pEntry->term != pExist->term) {
(void)syncLogBufferRollback(pBuf, pNode, index); (void)syncLogBufferRollback(pBuf, pNode, index);
} else { } else {
...@@ -317,14 +318,15 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt ...@@ -317,14 +318,15 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
" %" PRId64 " %" PRId64 ", %" PRId64 ")", " %" PRId64 " %" PRId64 ", %" PRId64 ")",
pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
pBuf->endIndex); pBuf->endIndex);
SyncTerm existPrevTerm = pBuf->entries[index % pBuf->size].prevLogTerm; SyncTerm existPrevTerm = syncLogReplMgrGetPrevLogTerm(NULL, pNode, index);
ASSERT(pEntry->term == pExist->term && prevTerm == existPrevTerm); ASSERT(pEntry->term == pExist->term && (pEntry->index > pBuf->matchIndex || prevTerm == existPrevTerm));
ret = 0; ret = 0;
goto _out; goto _out;
} }
} }
// update // update
ASSERT(pBuf->entries[index % pBuf->size].pItem == NULL);
SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = prevIndex, .prevLogTerm = prevTerm}; SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = prevIndex, .prevLogTerm = prevTerm};
pEntry = NULL; pEntry = NULL;
pBuf->entries[index % pBuf->size] = tmp; pBuf->entries[index % pBuf->size] = tmp;
...@@ -337,6 +339,10 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt ...@@ -337,6 +339,10 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
_out: _out:
syncEntryDestroy(pEntry); syncEntryDestroy(pEntry);
if (!inBuf) {
syncEntryDestroy(pExist);
pExist = NULL;
}
syncLogBufferValidate(pBuf); syncLogBufferValidate(pBuf);
taosThreadMutexUnlock(&pBuf->mutex); taosThreadMutexUnlock(&pBuf->mutex);
return ret; return ret;
...@@ -1003,6 +1009,16 @@ int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex ...@@ -1003,6 +1009,16 @@ int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex
lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore); lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
ASSERT(toIndex == lastVer + 1); ASSERT(toIndex == lastVer + 1);
// refill buffer on need
if (toIndex <= pBuf->startIndex) {
int32_t ret = syncLogBufferInitWithoutLock(pBuf, pNode);
if (ret < 0) {
sError("vgId:%d, failed to refill sync log buffer since %s", pNode->vgId, terrstr());
return -1;
}
}
ASSERT(pBuf->endIndex == toIndex);
syncLogBufferValidate(pBuf); syncLogBufferValidate(pBuf);
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册