未验证 提交 56cf8863 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #19160 from taosdata/enh/TD-21536

fix: handle error if sync buffer is full
...@@ -520,6 +520,7 @@ int32_t* taosGetErrno(); ...@@ -520,6 +520,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_SYN_BATCH_ERROR TAOS_DEF_ERROR_CODE(0, 0x0913) #define TSDB_CODE_SYN_BATCH_ERROR TAOS_DEF_ERROR_CODE(0, 0x0913)
#define TSDB_CODE_SYN_RESTORING TAOS_DEF_ERROR_CODE(0, 0x0914) #define TSDB_CODE_SYN_RESTORING TAOS_DEF_ERROR_CODE(0, 0x0914)
#define TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG TAOS_DEF_ERROR_CODE(0, 0x0915) // internal #define TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG TAOS_DEF_ERROR_CODE(0, 0x0915) // internal
#define TSDB_CODE_SYN_BUFFER_FULL TAOS_DEF_ERROR_CODE(0, 0x0916) //
#define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF) #define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF)
// tq // tq
......
...@@ -391,9 +391,9 @@ static int32_t vnodeSyncApplyMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsm ...@@ -391,9 +391,9 @@ static int32_t vnodeSyncApplyMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsm
const STraceId *trace = &pMsg->info.traceId; const STraceId *trace = &pMsg->info.traceId;
vGTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64 vGTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64
", weak:%d, code:%d, state:%d %s, type:%s", ", weak:%d, code:%d, state:%d %s, type:%s code:0x%x",
pVnode->config.vgId, pFsm, pMeta->index, pMeta->term, pMsg->info.conn.applyIndex, pMeta->isWeak, pMeta->code, pVnode->config.vgId, pFsm, pMeta->index, pMeta->term, pMsg->info.conn.applyIndex, pMeta->isWeak, pMeta->code,
pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType)); pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType), pMsg->code);
return tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, pMsg); return tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, pMsg);
} }
......
...@@ -109,6 +109,8 @@ SSyncRaftEntry* syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode, ...@@ -109,6 +109,8 @@ SSyncRaftEntry* syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode,
int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf); int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf);
int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex); int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex);
int32_t syncLogFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry,
int32_t applyCode);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -2386,7 +2386,11 @@ int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHand ...@@ -2386,7 +2386,11 @@ int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHand
int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) { int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
// append to log buffer // append to log buffer
if (syncLogBufferAppend(ths->pLogBuf, ths, pEntry) < 0) { if (syncLogBufferAppend(ths->pLogBuf, ths, pEntry) < 0) {
sError("vgId:%d, failed to enqueue sync log buffer. index:%" PRId64 "", ths->vgId, pEntry->index); sError("vgId:%d, failed to enqueue sync log buffer, index:%" PRId64, ths->vgId, pEntry->index);
terrno = TSDB_CODE_SYN_BUFFER_FULL;
(void)syncLogFsmExecute(ths, ths->pFsm, ths->state, ths->pRaftStore->currentTerm, pEntry,
TSDB_CODE_SYN_BUFFER_FULL);
syncEntryDestroy(pEntry);
return -1; return -1;
} }
...@@ -2685,8 +2689,8 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn ...@@ -2685,8 +2689,8 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn
} }
int32_t code = syncNodeAppend(ths, pEntry); int32_t code = syncNodeAppend(ths, pEntry);
if (code < 0 && ths->vgId != 1 && vnodeIsMsgBlock(pEntry->originalRpcType)) { if (code < 0) {
ASSERTS(false, "failed to append blocking msg"); sNError(ths, "failed to append blocking msg");
} }
return code; return code;
} }
......
...@@ -26,6 +26,11 @@ ...@@ -26,6 +26,11 @@
#include "syncSnapshot.h" #include "syncSnapshot.h"
#include "syncUtil.h" #include "syncUtil.h"
static bool syncIsMsgBlock(tmsg_t type) {
return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
(type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM);
}
int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf) { int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf) {
taosThreadMutexLock(&pBuf->mutex); taosThreadMutexLock(&pBuf->mutex);
int64_t index = pBuf->endIndex; int64_t index = pBuf->endIndex;
...@@ -441,26 +446,25 @@ _out: ...@@ -441,26 +446,25 @@ _out:
return matchIndex; return matchIndex;
} }
int32_t syncLogFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry) { int32_t syncLogFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry,
ASSERTS(pFsm->FpCommitCb != NULL, "No commit cb registered for the FSM"); int32_t applyCode) {
if ((pNode->replicaNum == 1) && pNode->restoreFinish && pNode->vgId != 1) { if ((pNode->replicaNum == 1) && pNode->restoreFinish && pNode->vgId != 1) {
return 0; return 0;
} }
if (pNode->vgId != 1 && vnodeIsMsgBlock(pEntry->originalRpcType)) { if (pNode->vgId != 1 && syncIsMsgBlock(pEntry->originalRpcType)) {
sTrace("vgId:%d, blocking msg ready to execute. index:%" PRId64 ", term: %" PRId64 ", type: %s", pNode->vgId, sTrace("vgId:%d, blocking msg ready to execute, index:%" PRId64 ", term:%" PRId64 ", type:%s code:0x%x",
pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType)); pNode->vgId, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType), applyCode);
} }
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {.code = applyCode};
syncEntry2OriginalRpc(pEntry, &rpcMsg); syncEntry2OriginalRpc(pEntry, &rpcMsg);
SFsmCbMeta cbMeta = {0}; SFsmCbMeta cbMeta = {0};
cbMeta.index = pEntry->index; cbMeta.index = pEntry->index;
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(pNode, pEntry->index); cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(pNode, pEntry->index);
cbMeta.isWeak = pEntry->isWeak; cbMeta.isWeak = pEntry->isWeak;
cbMeta.code = 0; cbMeta.code = applyCode;
cbMeta.state = role; cbMeta.state = role;
cbMeta.seqNum = pEntry->seqNum; cbMeta.seqNum = pEntry->seqNum;
cbMeta.term = pEntry->term; cbMeta.term = pEntry->term;
...@@ -469,7 +473,6 @@ int32_t syncLogFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, Syn ...@@ -469,7 +473,6 @@ int32_t syncLogFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, Syn
(void)syncRespMgrGetAndDel(pNode->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info); (void)syncRespMgrGetAndDel(pNode->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info);
int32_t code = pFsm->FpCommitCb(pFsm, &rpcMsg, &cbMeta); int32_t code = pFsm->FpCommitCb(pFsm, &rpcMsg, &cbMeta);
ASSERT(rpcMsg.pCont == NULL);
return code; return code;
} }
...@@ -520,7 +523,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm ...@@ -520,7 +523,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
pEntry->term, TMSG_INFO(pEntry->originalRpcType)); pEntry->term, TMSG_INFO(pEntry->originalRpcType));
} }
if (syncLogFsmExecute(pNode, pFsm, role, term, pEntry) != 0) { if (syncLogFsmExecute(pNode, pFsm, role, term, pEntry, 0) != 0) {
sError("vgId:%d, failed to execute sync log entry. index:%" PRId64 ", term:%" PRId64 sError("vgId:%d, failed to execute sync log entry. index:%" PRId64 ", term:%" PRId64
", role: %d, current term: %" PRId64, ", role: %d, current term: %" PRId64,
vgId, pEntry->index, pEntry->term, role, term); vgId, pEntry->index, pEntry->term, role, term);
......
...@@ -407,6 +407,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_STANDBY_NOT_READY, "Sync not ready for st ...@@ -407,6 +407,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_STANDBY_NOT_READY, "Sync not ready for st
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_BATCH_ERROR, "Sync batch error") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_BATCH_ERROR, "Sync batch error")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_RESTORING, "Sync is restoring") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_RESTORING, "Sync is restoring")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG, "Sync invalid snapshot msg") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG, "Sync invalid snapshot msg")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_BUFFER_FULL, "Sync buffer is full")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INTERNAL_ERROR, "Sync internal error") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INTERNAL_ERROR, "Sync internal error")
//tq //tq
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册