提交 44e3a52a 编写于 作者: C Cary Xu

code optimization

上级 d5c36b6d
...@@ -421,6 +421,8 @@ bool tsdbNoProblem(STsdbRepo* pRepo); ...@@ -421,6 +421,8 @@ bool tsdbNoProblem(STsdbRepo* pRepo);
// unit of walSize: MB // unit of walSize: MB
int tsdbCheckWal(STsdbRepo *pRepo, uint32_t walSize); int tsdbCheckWal(STsdbRepo *pRepo, uint32_t walSize);
int tsdbGetNumOfWaitCommit(STsdbRepo *pRepo, int *nNum);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -199,13 +199,19 @@ int tsdbCheckCommit(STsdbRepo *pRepo) { ...@@ -199,13 +199,19 @@ int tsdbCheckCommit(STsdbRepo *pRepo) {
return 0; return 0;
} }
int tsdbGetNumOfWaitCommit(STsdbRepo *pRepo, int *nNum) {
if (sem_getvalue(&pRepo->readyToCommit, nNum) != 0) {
tsdbError("vgId:%d failed to sem_getvalue of readyToCommit", REPO_ID(pRepo));
return -1;
}
return 0;
}
int tsdbCheckWal(STsdbRepo *pRepo, uint32_t walSize) { // MB int tsdbCheckWal(STsdbRepo *pRepo, uint32_t walSize) { // MB
STsdbCfg *pCfg = &(pRepo->config); STsdbCfg *pCfg = &(pRepo->config);
if ((walSize > tsdbWalFlushSize) && (walSize > (pCfg->totalBlocks / 2 * pCfg->cacheBlockSize))) { if ((walSize > tsdbWalFlushSize) && (walSize > (pCfg->totalBlocks / 2 * pCfg->cacheBlockSize))) {
if (tsdbAsyncCommit(pRepo) < 0) return -1; if (tsdbAsyncCommit(pRepo) < 0) return -1;
} }
return 0; return 0;
} }
......
...@@ -36,6 +36,7 @@ static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet ...@@ -36,6 +36,7 @@ static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet
static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *); static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *);
static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspRet *); static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspRet *);
static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite); static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite);
static int32_t vnodeCheckWal(SVnodeObj *pVnode);
int32_t vnodeInitWrite(void) { int32_t vnodeInitWrite(void) {
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessSubmitMsg; vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessSubmitMsg;
...@@ -167,6 +168,20 @@ static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pR ...@@ -167,6 +168,20 @@ static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pR
return code; return code;
} }
static int32_t vnodeCheckWal(SVnodeObj *pVnode) {
// no need to check wal size to trigger commit if:
// 1) vnode in committing state;
// 2) other instance wait to commit;
int nWaitCommit = 0;
if (tsdbGetNumOfWaitCommit(pVnode->tsdb, &nWaitCommit) != 0) {
return -1;
}
if ((nWaitCommit > 0) && (pVnode->isCommiting == 0)) {
return tsdbCheckWal(pVnode->tsdb, walGetFSize(pVnode->wal) >> 20);
}
return 0;
}
static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) { static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
int code = TSDB_CODE_SUCCESS; int code = TSDB_CODE_SUCCESS;
...@@ -181,9 +196,8 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe ...@@ -181,9 +196,8 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe
ASSERT(code != 0); ASSERT(code != 0);
} }
// no need to set wal size to trigger commit if vnode in committing state if (((++pVnode->tblMsgVer) & 16383) == 0) { // lazy check
if ((((++pVnode->tblMsgVer) & 32767) == 0) && (pVnode->isCommiting == 0)) { // lazy check vnodeCheckWal(pVnode);
tsdbCheckWal(pVnode->tsdb, walGetFSize(pVnode->wal) >> 20);
} }
tsdbClearTableCfg(pCfg); tsdbClearTableCfg(pCfg);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册