提交 b336971e 编写于 作者: C Cary Xu

refactor

上级 e65c33eb
...@@ -418,7 +418,8 @@ int tsdbCompact(STsdbRepo *pRepo); ...@@ -418,7 +418,8 @@ int tsdbCompact(STsdbRepo *pRepo);
// no problem return true // no problem return true
bool tsdbNoProblem(STsdbRepo* pRepo); bool tsdbNoProblem(STsdbRepo* pRepo);
void tsdbSetWalSize(STsdbRepo *pRepo, int64_t walSize); // unit of walSize: MB
int tsdbCheckWal(STsdbRepo *pRepo, uint32_t walSize);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -94,8 +94,6 @@ struct STsdbRepo { ...@@ -94,8 +94,6 @@ struct STsdbRepo {
pthread_mutex_t mutex; pthread_mutex_t mutex;
bool repoLocked; bool repoLocked;
int32_t code; // Commit code int32_t code; // Commit code
int64_t commitWalSize; // last commit wal size. MB
int64_t walSize; // current wal size. MB
SMergeBuf mergeBuf; //used when update=2 SMergeBuf mergeBuf; //used when update=2
int8_t compactState; // compact state: inCompact/noCompact/waitingCompact? int8_t compactState; // compact state: inCompact/noCompact/waitingCompact?
...@@ -111,7 +109,7 @@ struct STsdbRepo { ...@@ -111,7 +109,7 @@ struct STsdbRepo {
int tsdbLockRepo(STsdbRepo* pRepo); int tsdbLockRepo(STsdbRepo* pRepo);
int tsdbUnlockRepo(STsdbRepo* pRepo); int tsdbUnlockRepo(STsdbRepo* pRepo);
STsdbMeta* tsdbGetMeta(STsdbRepo* pRepo); STsdbMeta* tsdbGetMeta(STsdbRepo* pRepo);
int tsdbCheckCommit(STsdbRepo* pRepo, bool checkWal); int tsdbCheckCommit(STsdbRepo* pRepo);
int tsdbRestoreInfo(STsdbRepo* pRepo); int tsdbRestoreInfo(STsdbRepo* pRepo);
int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg); int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg);
void tsdbGetRootDir(int repoid, char dirName[]); void tsdbGetRootDir(int repoid, char dirName[]);
......
...@@ -185,14 +185,7 @@ int tsdbUnlockRepo(STsdbRepo *pRepo) { ...@@ -185,14 +185,7 @@ int tsdbUnlockRepo(STsdbRepo *pRepo) {
return 0; return 0;
} }
void tsdbSetWalSize(STsdbRepo *pRepo, int64_t walSize) { // MB int tsdbCheckCommit(STsdbRepo *pRepo) {
atomic_store_64(&pRepo->walSize,walSize);
if(walSize <= atomic_load_64(&pRepo->commitWalSize)) {
atomic_store_64(&pRepo->commitWalSize, 0); // restart
}
}
int tsdbCheckCommit(STsdbRepo *pRepo, bool checkWal) {
ASSERT(pRepo->mem != NULL); ASSERT(pRepo->mem != NULL);
STsdbCfg *pCfg = &(pRepo->config); STsdbCfg *pCfg = &(pRepo->config);
...@@ -202,14 +195,15 @@ int tsdbCheckCommit(STsdbRepo *pRepo, bool checkWal) { ...@@ -202,14 +195,15 @@ int tsdbCheckCommit(STsdbRepo *pRepo, bool checkWal) {
((listNEles(pRepo->mem->bufBlockList) >= pCfg->totalBlocks / 3) && (pBufBlock->remain < TSDB_BUFFER_RESERVE))) { ((listNEles(pRepo->mem->bufBlockList) >= pCfg->totalBlocks / 3) && (pBufBlock->remain < TSDB_BUFFER_RESERVE))) {
// trigger commit // trigger commit
if (tsdbAsyncCommit(pRepo) < 0) return -1; if (tsdbAsyncCommit(pRepo) < 0) return -1;
} else if (checkWal) { }
int64_t walSize = atomic_load_64(&pRepo->walSize); return 0;
int64_t commitWalSize = atomic_load_64(&pRepo->commitWalSize); }
int64_t delta = walSize - commitWalSize;
if ((delta > tsdbWalFlushSize) && (delta > (pCfg->totalBlocks / 2 * pCfg->cacheBlockSize))) { int tsdbCheckWal(STsdbRepo *pRepo, uint32_t walSize) { // MB
atomic_store_64(&pRepo->commitWalSize, walSize); STsdbCfg *pCfg = &(pRepo->config);
if (tsdbAsyncCommit(pRepo) < 0) return -1;
} if ((walSize > tsdbWalFlushSize) && (walSize > (pCfg->totalBlocks / 2 * pCfg->cacheBlockSize))) {
if (tsdbAsyncCommit(pRepo) < 0) return -1;
} }
return 0; return 0;
......
...@@ -77,7 +77,7 @@ int32_t tsdbInsertData(STsdbRepo *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pR ...@@ -77,7 +77,7 @@ int32_t tsdbInsertData(STsdbRepo *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pR
if (pRsp != NULL) pRsp->affectedRows = htonl(affectedrows); if (pRsp != NULL) pRsp->affectedRows = htonl(affectedrows);
if (tsdbCheckCommit(pRepo, false) < 0) return -1; if (tsdbCheckCommit(pRepo) < 0) return -1;
return 0; return 0;
} }
......
...@@ -140,7 +140,7 @@ int tsdbCreateTable(STsdbRepo *repo, STableCfg *pCfg) { ...@@ -140,7 +140,7 @@ int tsdbCreateTable(STsdbRepo *repo, STableCfg *pCfg) {
goto _err; goto _err;
} }
if (tsdbCheckCommit(pRepo, true) < 0) return -1; if (tsdbCheckCommit(pRepo) < 0) return -1;
return 0; return 0;
...@@ -191,7 +191,7 @@ int tsdbDropTable(STsdbRepo *repo, STableId tableId) { ...@@ -191,7 +191,7 @@ int tsdbDropTable(STsdbRepo *repo, STableId tableId) {
tsdbDebug("vgId:%d, table %s is dropped! tid:%d, uid:%" PRId64, pRepo->config.tsdbId, tbname, tid, uid); tsdbDebug("vgId:%d, table %s is dropped! tid:%d, uid:%" PRId64, pRepo->config.tsdbId, tbname, tid, uid);
free(tbname); free(tbname);
if (tsdbCheckCommit(pRepo, false) < 0) goto _err; if (tsdbCheckCommit(pRepo) < 0) goto _err;
return 0; return 0;
...@@ -416,7 +416,7 @@ int tsdbUpdateTableTagValue(STsdbRepo *repo, SUpdateTableTagValMsg *pMsg) { ...@@ -416,7 +416,7 @@ int tsdbUpdateTableTagValue(STsdbRepo *repo, SUpdateTableTagValMsg *pMsg) {
} }
tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, buf, pTable); tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, buf, pTable);
if (tsdbCheckCommit(pRepo, false) < 0) return -1; if (tsdbCheckCommit(pRepo) < 0) return -1;
return 0; return 0;
} }
......
...@@ -56,6 +56,7 @@ typedef struct { ...@@ -56,6 +56,7 @@ typedef struct {
uint64_t version; // current version uint64_t version; // current version
uint64_t cversion; // version while commit start uint64_t cversion; // version while commit start
uint64_t fversion; // version on saved data file uint64_t fversion; // version on saved data file
uint32_t tblMsgVer; // create table msg version
void * wqueue; // write queue void * wqueue; // write queue
void * qqueue; // read query queue void * qqueue; // read query queue
void * fqueue; // read fetch/cancel queue void * fqueue; // read fetch/cancel queue
......
...@@ -556,7 +556,7 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) { ...@@ -556,7 +556,7 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) {
if (!vnodeInInitStatus(pVnode)) { if (!vnodeInInitStatus(pVnode)) {
walRemoveOneOldFile(pVnode->wal); walRemoveOneOldFile(pVnode->wal);
} }
// vnodeGetVersion() and calling tsdbSetWalSize() would reply on the vnode isCommiting state // vnodeGetVersion() and calling tsdbCheckWal() would reply on the vnode isCommiting state
pVnode->isCommiting = 0; pVnode->isCommiting = 0;
return vnodeSaveVersion(pVnode); return vnodeSaveVersion(pVnode);
} }
......
...@@ -176,17 +176,16 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe ...@@ -176,17 +176,16 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe
return terrno; return terrno;
} }
// no need to set wal size to trigger commit if vnode in committing state
// TODO: retrieve pVnode->isCommiting need atomic operation?
if (((pVnode->version & 8191) == 0) && (pVnode->isCommiting == 0)) {
tsdbSetWalSize(pVnode->tsdb, walGetFSize(pVnode->wal) >> 20);
}
if (tsdbCreateTable(pVnode->tsdb, pCfg) < 0) { if (tsdbCreateTable(pVnode->tsdb, pCfg) < 0) {
code = terrno; code = terrno;
ASSERT(code != 0); ASSERT(code != 0);
} }
// no need to set wal size to trigger commit if vnode in committing state
if ((((++pVnode->tblMsgVer) & 32767) == 0) && (pVnode->isCommiting == 0)) { // lazy check
tsdbCheckWal(pVnode->tsdb, walGetFSize(pVnode->wal) >> 20);
}
tsdbClearTableCfg(pCfg); tsdbClearTableCfg(pCfg);
return code; return code;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册