未验证 提交 2671718c 编写于 作者: X Xiaoyu Wang 提交者: GitHub

Merge pull request #20990 from taosdata/FIX/TD-23640-3.0

enh: adjust size limit of applyQ and negotiationWin
......@@ -77,7 +77,7 @@ static inline bool tmsgIsValid(tmsg_t type) {
}
static inline bool vnodeIsMsgBlock(tmsg_t type) {
return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
(type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM);
(type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM) || (type == TDMT_VND_COMMIT);
}
static inline bool syncUtilUserCommit(tmsg_t msgType) {
......
......@@ -547,6 +547,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG TAOS_DEF_ERROR_CODE(0, 0x0915) // internal
#define TSDB_CODE_SYN_BUFFER_FULL TAOS_DEF_ERROR_CODE(0, 0x0916)
#define TSDB_CODE_SYN_WRITE_STALL TAOS_DEF_ERROR_CODE(0, 0x0917)
#define TSDB_CODE_SYN_NEGO_WIN_EXCEEDED TAOS_DEF_ERROR_CODE(0, 0X0918)
#define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF)
// tq
......
......@@ -285,8 +285,11 @@ typedef enum ELogicConditionType {
#define TSDB_DNODE_ROLE_VNODE 2
#define TSDB_MAX_REPLICA 5
#define TSDB_SYNC_LOG_BUFFER_SIZE 4096
#define TSDB_SYNC_LOG_BUFFER_RETENTION (TSDB_SYNC_LOG_BUFFER_SIZE >> 4)
#define TSDB_SYNC_LOG_BUFFER_RETENTION 256
#define TSDB_SYNC_APPLYQ_SIZE_LIMIT 512
#define TSDB_SYNC_NEGOTIATION_WIN 512
#define TSDB_TBNAME_COLUMN_INDEX (-1)
#define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta
......
......@@ -97,7 +97,6 @@ int32_t vnodeGetBatchMeta(SVnode* pVnode, SRpcMsg* pMsg);
// vnodeCommit.c
int32_t vnodeBegin(SVnode* pVnode);
int32_t vnodeShouldCommit(SVnode* pVnode, bool atExit);
void vnodeUpdCommitSched(SVnode* pVnode);
void vnodeRollback(SVnode* pVnode);
int32_t vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg);
int32_t vnodeCommitInfo(const char* dir);
......
......@@ -378,7 +378,6 @@ struct SVnode {
STQ* pTq;
SSink* pSink;
tsem_t canCommit;
SVCommitSched commitSched;
int64_t sync;
TdThreadMutex lock;
bool blocked;
......@@ -387,9 +386,6 @@ struct SVnode {
int32_t blockSec;
int64_t blockSeq;
SQHandle* pQuery;
#if 0
SRpcHandleInfo blockInfo;
#endif
};
#define TD_VID(PVNODE) ((PVNODE)->config.vgId)
......
......@@ -143,23 +143,13 @@ _exit:
return code;
}
void vnodeUpdCommitSched(SVnode *pVnode) {
int64_t randNum = taosRand();
pVnode->commitSched.commitMs = taosGetMonoTimestampMs();
pVnode->commitSched.maxWaitMs = tsVndCommitMaxIntervalMs + (randNum % tsVndCommitMaxIntervalMs);
}
int vnodeShouldCommit(SVnode *pVnode, bool atExit) {
SVCommitSched *pSched = &pVnode->commitSched;
int64_t nowMs = taosGetMonoTimestampMs();
bool diskAvail = osDataSpaceAvailable();
bool needCommit = false;
taosThreadMutexLock(&pVnode->mutex);
if (pVnode->inUse && diskAvail) {
needCommit =
((pVnode->inUse->size > pVnode->inUse->node.size) && (pSched->commitMs + SYNC_VND_COMMIT_MIN_MS < nowMs)) ||
((pVnode->inUse->size > 0) && atExit);
needCommit = (pVnode->inUse->size > pVnode->inUse->node.size) || (pVnode->inUse->size > 0 && atExit);
}
taosThreadMutexUnlock(&pVnode->mutex);
return needCommit;
......@@ -431,8 +421,6 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) {
vInfo("vgId:%d, start to commit, commitId:%" PRId64 " version:%" PRId64 " term: %" PRId64, TD_VID(pVnode),
pInfo->info.state.commitID, pInfo->info.state.committed, pInfo->info.state.commitTerm);
vnodeUpdCommitSched(pVnode);
// persist wal before starting
if (walPersist(pVnode->pWal) < 0) {
vError("vgId:%d, failed to persist wal since %s", TD_VID(pVnode), terrstr());
......
......@@ -286,8 +286,6 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
taosThreadMutexInit(&pVnode->mutex, NULL);
taosThreadCondInit(&pVnode->poolNotEmpty, NULL);
vnodeUpdCommitSched(pVnode);
int8_t rollback = vnodeShouldRollback(pVnode);
// open buffer pool
......
......@@ -112,9 +112,6 @@ static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak
pVnode->blocked = true;
pVnode->blockSec = taosGetTimestampSec();
pVnode->blockSeq = seq;
#if 0
pVnode->blockInfo = pMsg->info;
#endif
}
taosThreadMutexUnlock(&pVnode->lock);
......@@ -157,8 +154,6 @@ void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit) {
} else {
tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
vnodeUpdCommitSched(pVnode);
}
#if BATCH_ENABLE
......
......@@ -53,8 +53,15 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
goto _err;
}
if (pNode->restoreFinish && index - pBuf->commitIndex >= TSDB_SYNC_NEGOTIATION_WIN) {
terrno = TSDB_CODE_SYN_NEGO_WIN_EXCEEDED;
sError("vgId:%d, failed to append since %s, index:%" PRId64 ", commit-index:%" PRId64, pNode->vgId, terrstr(),
index, pBuf->commitIndex);
goto _err;
}
SyncIndex appliedIndex = pNode->pFsm->FpAppliedIndexCb(pNode->pFsm);
if (pNode->restoreFinish && pBuf->commitIndex - appliedIndex >= pBuf->size) {
if (pNode->restoreFinish && pBuf->commitIndex - appliedIndex >= TSDB_SYNC_APPLYQ_SIZE_LIMIT) {
terrno = TSDB_CODE_SYN_WRITE_STALL;
sError("vgId:%d, failed to append since %s. index:%" PRId64 ", commit-index:%" PRId64 ", applied-index:%" PRId64,
pNode->vgId, terrstr(), index, pBuf->commitIndex, appliedIndex);
......@@ -83,6 +90,7 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
_err:
syncLogBufferValidate(pBuf);
taosThreadMutexUnlock(&pBuf->mutex);
taosMsleep(1);
return -1;
}
......
......@@ -202,21 +202,22 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo
// restore error code
terrno = errCode;
SyncIndex appliedIndex = pNode->pFsm->FpAppliedIndexCb(pNode->pFsm);
if (pNode != NULL) {
taosPrintLog(flags, level, dflag,
"vgId:%d, %s, sync:%s, term:%" PRIu64 ", commit-index:%" PRId64 ", first-ver:%" PRId64
", last-ver:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64 ", snap-term:%" PRIu64
"vgId:%d, %s, sync:%s, term:%" PRIu64 ", commit-index:%" PRId64 ", applied-index:%" PRId64
", first-ver:%" PRId64 ", last-ver:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64 ", snap-term:%" PRIu64
", elect-times:%d, as-leader-times:%d, cfg-ch-times:%d, hb-slow:%d, hbr-slow:%d, "
"aq-items:%d, snaping:%" PRId64 ", replicas:%d, last-cfg:%" PRId64
", chging:%d, restore:%d, quorum:%d, elect-lc-timer:%" PRId64 ", hb:%" PRId64
", buffer:%s, repl-mgrs:%s, members:%s, hb:%s, hb-reply:%s",
pNode->vgId, eventLog, syncStr(pNode->state), currentTerm, pNode->commitIndex, logBeginIndex,
logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->electNum,
pNode->becomeLeaderNum, pNode->configChangeNum, pNode->hbSlowNum, pNode->hbrSlowNum, aqItems,
pNode->snapshottingIndex, pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing,
pNode->restoreFinish, syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser,
bufferStatesStr, replMgrStatesStr, cfgStr, hbTimeStr, hbrTimeStr);
pNode->vgId, eventLog, syncStr(pNode->state), currentTerm, pNode->commitIndex, appliedIndex,
logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
pNode->electNum, pNode->becomeLeaderNum, pNode->configChangeNum, pNode->hbSlowNum, pNode->hbrSlowNum,
aqItems, pNode->snapshottingIndex, pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing,
pNode->restoreFinish, syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock,
pNode->heartbeatTimerLogicClockUser, bufferStatesStr, replMgrStatesStr, cfgStr, hbTimeStr, hbrTimeStr);
}
}
......
......@@ -426,6 +426,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_RESTORING, "Sync leader is restor
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG, "Sync invalid snapshot msg")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_BUFFER_FULL, "Sync buffer is full")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_WRITE_STALL, "Sync write stall")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NEGO_WIN_EXCEEDED, "Sync negotiation win exceeded")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INTERNAL_ERROR, "Sync internal error")
//tq
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册