未验证 提交 bf039659 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #20413 from taosdata/FIX/TD-22564-main

enh: not allow to propose if fsm applying progress lagging behind too far
......@@ -143,10 +143,11 @@ typedef struct SSyncFSM {
void* data;
int32_t (*FpCommitCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta);
SyncIndex (*FpAppliedIndexCb)(const struct SSyncFSM* pFsm);
int32_t (*FpPreCommitCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta);
void (*FpRollBackCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta);
void (*FpRestoreFinishCb)(const struct SSyncFSM* pFsm);
void (*FpRestoreFinishCb)(const struct SSyncFSM* pFsm, const SyncIndex commitIdx);
void (*FpReConfigCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SReConfigCbMeta* pMeta);
void (*FpLeaderTransferCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta);
bool (*FpApplyQueueEmptyCb)(const struct SSyncFSM* pFsm);
......
......@@ -542,7 +542,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_SYN_BATCH_ERROR TAOS_DEF_ERROR_CODE(0, 0x0913)
#define TSDB_CODE_SYN_RESTORING TAOS_DEF_ERROR_CODE(0, 0x0914)
#define TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG TAOS_DEF_ERROR_CODE(0, 0x0915) // internal
#define TSDB_CODE_SYN_BUFFER_FULL TAOS_DEF_ERROR_CODE(0, 0x0916) //
#define TSDB_CODE_SYN_BUFFER_FULL TAOS_DEF_ERROR_CODE(0, 0x0916)
#define TSDB_CODE_SYN_WRITE_STALL TAOS_DEF_ERROR_CODE(0, 0x0917)
#define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF)
// tq
......
......@@ -113,6 +113,7 @@ typedef struct SMnode {
bool deploy;
char *path;
int64_t checkTime;
SyncIndex applied;
SSdb *pSdb;
SArray *pSteps;
SQHandle *pQuery;
......
......@@ -380,11 +380,13 @@ static int32_t mndInitSdb(SMnode *pMnode) {
}
static int32_t mndOpenSdb(SMnode *pMnode) {
int32_t code = 0;
if (!pMnode->deploy) {
return sdbReadFile(pMnode->pSdb);
} else {
return 0;
code = sdbReadFile(pMnode->pSdb);
}
atomic_store_64(&pMnode->applied, pMnode->pSdb->commitIndex);
return code;
}
static void mndCleanupSdb(SMnode *pMnode) {
......
......@@ -129,6 +129,14 @@ int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta
int32_t mndSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
int32_t code = 0;
pMsg->info.conn.applyIndex = pMeta->index;
pMsg->info.conn.applyTerm = pMeta->term;
if (pMsg->code == 0) {
SMnode *pMnode = pFsm->data;
atomic_store_64(&pMnode->applied, pMsg->info.conn.applyIndex);
}
if (!syncUtilUserCommit(pMsg->msgType)) {
goto _out;
}
......@@ -140,6 +148,11 @@ _out:
return code;
}
SyncIndex mndSyncAppliedIndex(const SSyncFSM *pFSM) {
SMnode *pMnode = pFSM->data;
return atomic_load_64(&pMnode->applied);
}
int32_t mndSyncGetSnapshot(const SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pReaderParam, void **ppReader) {
mInfo("start to read snapshot from sdb in atomic way");
SMnode *pMnode = pFsm->data;
......@@ -153,7 +166,7 @@ static void mndSyncGetSnapshotInfo(const SSyncFSM *pFsm, SSnapshot *pSnapshot) {
sdbGetCommitInfo(pMnode->pSdb, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm, &pSnapshot->lastConfigIndex);
}
void mndRestoreFinish(const SSyncFSM *pFsm) {
void mndRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) {
SMnode *pMnode = pFsm->data;
if (!pMnode->deploy) {
......@@ -167,6 +180,8 @@ void mndRestoreFinish(const SSyncFSM *pFsm) {
} else {
mInfo("vgId:1, sync restore finished");
}
ASSERT(commitIdx == mndSyncAppliedIndex(pFsm));
}
int32_t mndSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) {
......@@ -253,6 +268,7 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
pFsm->data = pMnode;
pFsm->FpCommitCb = mndSyncCommitMsg;
pFsm->FpAppliedIndexCb = mndSyncAppliedIndex;
pFsm->FpPreCommitCb = NULL;
pFsm->FpRollBackCb = NULL;
pFsm->FpRestoreFinishCb = mndRestoreFinish;
......
......@@ -306,13 +306,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
void *pReq;
int32_t len;
int32_t ret;
/*
if (!pVnode->inUse) {
terrno = TSDB_CODE_VND_NO_AVAIL_BUFPOOL;
vError("vgId:%d, not ready to write since %s", TD_VID(pVnode), terrstr());
return -1;
}
*/
if (version <= pVnode->state.applied) {
vError("vgId:%d, duplicate write request. version: %" PRId64 ", applied: %" PRId64 "", TD_VID(pVnode), version,
pVnode->state.applied);
......@@ -326,8 +320,8 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
ASSERT(pVnode->state.applyTerm <= pMsg->info.conn.applyTerm);
ASSERT(pVnode->state.applied + 1 == version);
pVnode->state.applied = version;
pVnode->state.applyTerm = pMsg->info.conn.applyTerm;
atomic_store_64(&pVnode->state.applied, version);
atomic_store_64(&pVnode->state.applyTerm, pMsg->info.conn.applyTerm);
if (!syncUtilUserCommit(pMsg->msgType)) goto _exit;
......
......@@ -433,7 +433,23 @@ static int32_t vnodeSyncApplyMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsm
}
static int32_t vnodeSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
return vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
if (pMsg->code == 0) {
return vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
}
const STraceId *trace = &pMsg->info.traceId;
SVnode *pVnode = pFsm->data;
vnodePostBlockMsg(pVnode, pMsg);
SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
if (rsp.info.handle != NULL) {
tmsgSendRsp(&rsp);
}
vGTrace("vgId:%d, msg:%p is freed, code:0x%x index:%" PRId64, TD_VID(pVnode), pMsg, rsp.code, pMeta->index);
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
return 0;
}
static int32_t vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
......@@ -443,6 +459,11 @@ static int32_t vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const
return 0;
}
static SyncIndex vnodeSyncAppliedIndex(const SSyncFSM *pFSM) {
SVnode *pVnode = pFSM->data;
return atomic_load_64(&pVnode->state.applied);
}
static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
SVnode *pVnode = pFsm->data;
vTrace("vgId:%d, rollback-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s",
......@@ -505,21 +526,26 @@ static int32_t vnodeSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *p
return code;
}
static void vnodeRestoreFinish(const SSyncFSM *pFsm) {
static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) {
SVnode *pVnode = pFsm->data;
SyncIndex appliedIdx = -1;
do {
int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
if (itemSize == 0) {
vInfo("vgId:%d, apply queue is empty, restore finish", pVnode->config.vgId);
appliedIdx = vnodeSyncAppliedIndex(pFsm);
ASSERT(appliedIdx <= commitIdx);
if (appliedIdx == commitIdx) {
vInfo("vgId:%d, no items to be applied, restore finish", pVnode->config.vgId);
break;
} else {
vInfo("vgId:%d, restore not finish since %d items in apply queue", pVnode->config.vgId, itemSize);
vInfo("vgId:%d, restore not finish since %" PRId64 " items to be applied. commit-index:%" PRId64
", applied-index:%" PRId64,
pVnode->config.vgId, commitIdx - appliedIdx, commitIdx, appliedIdx);
taosMsleep(10);
}
} while (true);
walApplyVer(pVnode->pWal, pVnode->state.applied);
ASSERT(commitIdx == vnodeSyncAppliedIndex(pFsm));
walApplyVer(pVnode->pWal, commitIdx);
pVnode->restored = true;
vInfo("vgId:%d, sync restore finished", pVnode->config.vgId);
......@@ -569,6 +595,7 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
pFsm->data = pVnode;
pFsm->FpCommitCb = vnodeSyncCommitMsg;
pFsm->FpAppliedIndexCb = vnodeSyncAppliedIndex;
pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshotInfo;
......
......@@ -2160,8 +2160,8 @@ int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
// append to log buffer
if (syncLogBufferAppend(ths->pLogBuf, ths, pEntry) < 0) {
sError("vgId:%d, failed to enqueue sync log buffer, index:%" PRId64, ths->vgId, pEntry->index);
terrno = TSDB_CODE_SYN_BUFFER_FULL;
(void)syncLogFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, TSDB_CODE_SYN_BUFFER_FULL);
ASSERT(terrno != 0);
(void)syncLogFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno);
syncEntryDestroy(pEntry);
return -1;
}
......
......@@ -48,7 +48,16 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
SyncIndex index = pEntry->index;
if (index - pBuf->startIndex >= pBuf->size) {
sError("vgId:%d, failed to append due to sync log buffer full. index:%" PRId64 "", pNode->vgId, index);
terrno = TSDB_CODE_SYN_BUFFER_FULL;
sError("vgId:%d, failed to append since %s. index:%" PRId64 "", pNode->vgId, terrstr(), index);
goto _err;
}
SyncIndex appliedIndex = pNode->pFsm->FpAppliedIndexCb(pNode->pFsm);
if (pNode->restoreFinish && pBuf->commitIndex - appliedIndex >= pBuf->size) {
terrno = TSDB_CODE_SYN_WRITE_STALL;
sError("vgId:%d, failed to append since %s. index:%" PRId64 ", commit-index:%" PRId64 ", applied-index:%" PRId64,
pNode->vgId, terrstr(), index, pBuf->commitIndex, appliedIndex);
goto _err;
}
......@@ -475,7 +484,7 @@ _out:
int32_t syncLogFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry,
int32_t applyCode) {
if ((pNode->replicaNum == 1) && pNode->restoreFinish && pNode->vgId != 1) {
if (pNode->replicaNum == 1 && pNode->restoreFinish && pNode->vgId != 1) {
return 0;
}
......@@ -587,10 +596,10 @@ _out:
// mark as restored if needed
if (!pNode->restoreFinish && pBuf->commitIndex >= pNode->commitIndex && pEntry != NULL &&
currentTerm <= pEntry->term) {
pNode->pFsm->FpRestoreFinishCb(pNode->pFsm);
pNode->pFsm->FpRestoreFinishCb(pNode->pFsm, pBuf->commitIndex);
pNode->restoreFinish = true;
sInfo("vgId:%d, restore finished. log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId,
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
sInfo("vgId:%d, restore finished. term:%" PRId64 ", log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
pNode->vgId, currentTerm, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
}
if (!inBuf) {
......
......@@ -423,6 +423,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_BATCH_ERROR, "Sync batch error")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_RESTORING, "Sync leader is restoring")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG, "Sync invalid snapshot msg")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_BUFFER_FULL, "Sync buffer is full")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_WRITE_STALL, "Sync write stall")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INTERNAL_ERROR, "Sync internal error")
//tq
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册