提交 92e05b8e 编写于 作者: B Benguang Zhao

enh: schedule vnodeCommit uniformly distributed

上级 691b75ad
......@@ -86,6 +86,7 @@ int32_t vnodeGetBatchMeta(SVnode* pVnode, SRpcMsg* pMsg);
// vnodeCommit.c
int32_t vnodeBegin(SVnode* pVnode);
int32_t vnodeShouldCommit(SVnode* pVnode);
void vnodeUpdCommitSched(SVnode* pVnode);
void vnodeRollback(SVnode* pVnode);
int32_t vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg);
int32_t vnodeCommitInfo(const char* dir, const SVnodeInfo* pInfo);
......
......@@ -332,6 +332,11 @@ struct STsdbKeepCfg {
int32_t keep2;
};
typedef struct SVCommitSched {
int64_t commitMs;
int64_t maxWaitMs;
} SVCommitSched;
struct SVnode {
char* path;
SVnodeCfg config;
......@@ -350,7 +355,7 @@ struct SVnode {
STQ* pTq;
SSink* pSink;
tsem_t canCommit;
int64_t commitMs;
SVCommitSched commitSched;
int64_t sync;
TdThreadMutex lock;
bool blocked;
......
......@@ -58,15 +58,22 @@ int vnodeBegin(SVnode *pVnode) {
return 0;
}
void vnodeUpdCommitSched(SVnode *pVnode) {
int64_t randNum = taosRand();
pVnode->commitSched.commitMs = taosGetMonoTimestampMs();
pVnode->commitSched.maxWaitMs = SYNC_VND_COMMIT_MAX_MS + (randNum % SYNC_VND_COMMIT_MAX_MS);
}
int vnodeShouldCommit(SVnode *pVnode) {
if (!pVnode->inUse || !osDataSpaceAvailable()) {
return false;
}
SVCommitSched *pSched = &pVnode->commitSched;
int64_t nowMs = taosGetMonoTimestampMs();
return (((pVnode->inUse->size > pVnode->inUse->node.size) && (pVnode->commitMs + SYNC_VND_COMMIT_MIN_MS < nowMs)) ||
(pVnode->inUse->size > 0 && pVnode->commitMs + SYNC_VND_COMMIT_MAX_MS < nowMs));
return (((pVnode->inUse->size > pVnode->inUse->node.size) && (pSched->commitMs + SYNC_VND_COMMIT_MIN_MS < nowMs)) ||
(pVnode->inUse->size > 0 && pSched->commitMs + pSched->maxWaitMs < nowMs));
}
int vnodeShouldCommitOld(SVnode *pVnode) {
......@@ -306,7 +313,7 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) {
vInfo("vgId:%d, start to commit, commitId:%" PRId64 " version:%" PRId64 " term: %" PRId64, TD_VID(pVnode),
pInfo->info.state.commitID, pInfo->info.state.committed, pInfo->info.state.commitTerm);
pVnode->commitMs = taosGetMonoTimestampMs();
vnodeUpdCommitSched(pVnode);
// persist wal before starting
if (walPersist(pVnode->pWal) < 0) {
......
......@@ -142,7 +142,6 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
pVnode->path = (char *)&pVnode[1];
strcpy(pVnode->path, path);
pVnode->config = info.config;
pVnode->commitMs = taosGetMonoTimestampMs();
pVnode->state.committed = info.state.committed;
pVnode->state.commitTerm = info.state.commitTerm;
pVnode->state.commitID = info.state.commitID;
......@@ -158,6 +157,8 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
taosThreadMutexInit(&pVnode->mutex, NULL);
taosThreadCondInit(&pVnode->poolNotEmpty, NULL);
vnodeUpdCommitSched(pVnode);
int8_t rollback = vnodeShouldRollback(pVnode);
// open buffer pool
......
......@@ -154,7 +154,7 @@ void vnodeProposeCommitOnNeed(SVnode *pVnode) {
vInfo("vgId:%d, proposed vnode commit", pVnode->config.vgId);
_out:
pVnode->commitMs = taosGetMonoTimestampMs();
vnodeUpdCommitSched(pVnode);
rpcFreeCont(rpcMsg.pCont);
rpcMsg.pCont = NULL;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册