提交 91fb0eb0 编写于 作者: C Cary Xu

optimization

上级 530a7bf2
......@@ -421,7 +421,8 @@ bool tsdbNoProblem(STsdbRepo* pRepo);
// unit of walSize: MB
int tsdbCheckWal(STsdbRepo *pRepo, uint32_t walSize);
int tsdbGetValOfWaitCommit(STsdbRepo *pRepo, int *nVal);
// not commit if other instances in committing state or waiting to commit
bool tsdbIsNeedCommit(STsdbRepo *pRepo);
#ifdef __cplusplus
}
......
......@@ -185,6 +185,22 @@ int tsdbUnlockRepo(STsdbRepo *pRepo) {
return 0;
}
bool tsdbIsNeedCommit(STsdbRepo *pRepo) {
int nVal = 0;
if (sem_getvalue(&pRepo->readyToCommit, &nVal) != 0) {
tsdbError("vgId:%d failed to sem_getvalue of readyToCommit", REPO_ID(pRepo));
}
return nVal > 0;
}
int tsdbCheckWal(STsdbRepo *pRepo, uint32_t walSize) { // MB
STsdbCfg *pCfg = &(pRepo->config);
if ((walSize > tsdbWalFlushSize) && (walSize > (pCfg->totalBlocks / 2 * pCfg->cacheBlockSize))) {
if (tsdbIsNeedCommit(pRepo) && (tsdbAsyncCommit(pRepo) < 0)) return -1;
}
return 0;
}
int tsdbCheckCommit(STsdbRepo *pRepo) {
ASSERT(pRepo->mem != NULL);
STsdbCfg *pCfg = &(pRepo->config);
......@@ -194,23 +210,7 @@ int tsdbCheckCommit(STsdbRepo *pRepo) {
if ((pRepo->mem->extraBuffList != NULL) ||
((listNEles(pRepo->mem->bufBlockList) >= pCfg->totalBlocks / 3) && (pBufBlock->remain < TSDB_BUFFER_RESERVE))) {
// trigger commit
if (tsdbAsyncCommit(pRepo) < 0) return -1;
}
return 0;
}
int tsdbGetValOfWaitCommit(STsdbRepo *pRepo, int *nVal) {
if (sem_getvalue(&pRepo->readyToCommit, nVal) != 0) {
tsdbError("vgId:%d failed to sem_getvalue of readyToCommit", REPO_ID(pRepo));
return -1;
}
return 0;
}
int tsdbCheckWal(STsdbRepo *pRepo, uint32_t walSize) { // MB
STsdbCfg *pCfg = &(pRepo->config);
if ((walSize > tsdbWalFlushSize) && (walSize > (pCfg->totalBlocks / 2 * pCfg->cacheBlockSize))) {
if (tsdbAsyncCommit(pRepo) < 0) return -1;
if (tsdbIsNeedCommit(pRepo) && (tsdbAsyncCommit(pRepo) < 0)) return -1;
}
return 0;
}
......
......@@ -550,14 +550,13 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) {
}
if (status == TSDB_STATUS_COMMIT_OVER) {
pVnode->isCommiting = 0;
pVnode->isFull = 0;
pVnode->fversion = pVnode->cversion;
vInfo("vgId:%d, commit over, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, pVnode->version);
if (!vnodeInInitStatus(pVnode)) {
walRemoveOneOldFile(pVnode->wal);
}
// vnodeGetVersion() and calling tsdbCheckWal() would reply on the vnode isCommiting state
pVnode->isCommiting = 0;
return vnodeSaveVersion(pVnode);
}
......
......@@ -169,14 +169,7 @@ static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pR
}
static int32_t vnodeCheckWal(SVnodeObj *pVnode) {
int nVal = 0;
if (tsdbGetValOfWaitCommit(pVnode->tsdb, &nVal) != 0) {
return -1;
}
// no need to check wal size to trigger commit if:
// 1) have instances waiting to commit;
// or 2) vnode in committing state;
if ((nVal > 0) && (pVnode->isCommiting == 0)) {
if (tsdbIsNeedCommit(pVnode->tsdb)) {
return tsdbCheckWal(pVnode->tsdb, walGetFSize(pVnode->wal) >> 20);
}
return 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册