未验证 提交 15b3ca43 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #18656 from taosdata/FIX/TD-19334-3.0

fix: get result of tmsPutToQueue for FpCommitCb
......@@ -138,8 +138,8 @@ typedef struct SSnapshotMeta {
typedef struct SSyncFSM {
void* data;
void (*FpCommitCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta);
void (*FpPreCommitCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta);
int32_t (*FpCommitCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta);
int32_t (*FpPreCommitCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta);
void (*FpRollBackCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta);
void (*FpRestoreFinishCb)(const struct SSyncFSM* pFsm);
......
......@@ -72,7 +72,7 @@ static int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
return code;
}
void mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
SMnode *pMnode = pFsm->data;
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
SSdbRaw *pRaw = pMsg->pCont;
......@@ -114,12 +114,15 @@ void mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *p
mError("trans:%d, not found while execute in mnode since %s", transId, terrstr());
}
}
return 0;
}
void mndSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
mndProcessWriteMsg(pFsm, pMsg, pMeta);
int32_t mndSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
int32_t code = mndProcessWriteMsg(pFsm, pMsg, pMeta);
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
return code;
}
int32_t mndSyncGetSnapshot(const SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pReaderParam, void **ppReader) {
......
......@@ -363,7 +363,7 @@ static int32_t vnodeSyncGetSnapshot(const SSyncFSM *pFsm, SSnapshot *pSnapshot)
return 0;
}
static void vnodeSyncApplyMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
static int32_t vnodeSyncApplyMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
SVnode *pVnode = pFsm->data;
pMsg->info.conn.applyIndex = pMeta->index;
pMsg->info.conn.applyTerm = pMeta->term;
......@@ -374,17 +374,18 @@ static void vnodeSyncApplyMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbM
pVnode->config.vgId, pFsm, pMeta->index, pMeta->term, pMsg->info.conn.applyIndex, pMeta->isWeak, pMeta->code,
pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType));
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, pMsg);
return tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, pMsg);
}
static void vnodeSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
static int32_t vnodeSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
return vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
}
static void vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
static int32_t vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
if (pMeta->isWeak == 1) {
vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
return vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
}
return 0;
}
static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
......
......@@ -107,7 +107,7 @@ int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode);
// private
SSyncRaftEntry* syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index, bool* pInBuf);
int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf);
int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SyncIndex toIndex);
int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex);
#ifdef __cplusplus
}
......
......@@ -310,7 +310,7 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
ASSERT(pEntry->index == pExist->index);
if (pEntry->term != pExist->term) {
(void)syncLogBufferRollback(pBuf, index);
(void)syncLogBufferRollback(pBuf, pNode, index);
} else {
sTrace("vgId:%d, duplicate log entry received. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64
" %" PRId64 " %" PRId64 ", %" PRId64 ")",
......@@ -457,9 +457,9 @@ int32_t syncLogFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, Syn
cbMeta.flag = -1;
(void)syncRespMgrGetAndDel(pNode->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info);
pFsm->FpCommitCb(pFsm, &rpcMsg, &cbMeta);
int32_t code = pFsm->FpCommitCb(pFsm, &rpcMsg, &cbMeta);
ASSERT(rpcMsg.pCont == NULL);
return 0;
return code;
}
int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf) {
......@@ -516,8 +516,9 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
}
if (syncLogFsmExecute(pNode, pFsm, role, term, pEntry) != 0) {
sError("vgId:%d, failed to execute sync log entry in FSM. log index:%" PRId64 ", term:%" PRId64 "", vgId,
pEntry->index, pEntry->term);
sError("vgId:%d, failed to commit sync log entry. index:%" PRId64 ", term:%" PRId64
", role: %d, current term: %" PRId64,
vgId, pEntry->index, pEntry->term, role, term);
goto _out;
}
pBuf->commitIndex = index;
......@@ -971,14 +972,18 @@ void syncLogBufferDestroy(SSyncLogBuffer* pBuf) {
return;
}
int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SyncIndex toIndex) {
int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex) {
ASSERT(pBuf->commitIndex < toIndex && toIndex <= pBuf->endIndex);
sInfo("vgId:%d, rollback sync log buffer. toindex: %" PRId64 ", buffer: [%" PRId64 " %" PRId64 " %" PRId64
", %" PRId64 ")",
pNode->vgId, toIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
SyncIndex index = pBuf->endIndex - 1;
while (index >= toIndex) {
SSyncRaftEntry* pEntry = pBuf->entries[index % pBuf->size].pItem;
if (pEntry != NULL) {
syncEntryDestroy(pEntry);
(void)syncEntryDestroy(pEntry);
pEntry = NULL;
memset(&pBuf->entries[index % pBuf->size], 0, sizeof(pBuf->entries[0]));
}
......@@ -996,7 +1001,7 @@ int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
ASSERT(lastVer == pBuf->matchIndex);
SyncIndex index = pBuf->endIndex - 1;
(void)syncLogBufferRollback(pBuf, pBuf->matchIndex + 1);
(void)syncLogBufferRollback(pBuf, pNode, pBuf->matchIndex + 1);
sInfo("vgId:%d, reset sync log buffer. buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId,
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册