提交 f227df66 编写于 作者: C Cary Xu

[TS-385]<enhance>: support flush wal by walFlushSize

上级 6b7e4a85
......@@ -278,9 +278,9 @@ do { \
#define TSDB_MAX_TOTAL_BLOCKS 10000
#define TSDB_DEFAULT_TOTAL_BLOCKS 6
#define TSDB_MIN_WAL_FLUSH_SIZE 0
#define TSDB_MAX_WAL_FLUSH_SIZE 10000000
#define TSDB_DEFAULT_WAL_FLUSH_SIZE 1024 // 1024MB
#define TSDB_MIN_WAL_FLUSH_SIZE 128 // MB
#define TSDB_MAX_WAL_FLUSH_SIZE 10000000 // MB
#define TSDB_DEFAULT_WAL_FLUSH_SIZE 1024 // MB
#define TSDB_MIN_TABLES 4
#define TSDB_MAX_TABLES 10000000
......
......@@ -418,6 +418,8 @@ int tsdbCompact(STsdbRepo *pRepo);
// no problem return true
bool tsdbNoProblem(STsdbRepo* pRepo);
void tsdbSetWalSize(STsdbRepo *pRepo, int64_t walSize);
#ifdef __cplusplus
}
#endif
......
......@@ -66,6 +66,7 @@ int32_t walRestore(twalh, void *pVnode, FWalWrite writeFp);
int32_t walGetWalFile(twalh, char *fileName, int64_t *fileId);
uint64_t walGetVersion(twalh);
void walResetVersion(twalh, uint64_t newVer);
int64_t walGetFSize(twalh);
#ifdef __cplusplus
}
......
......@@ -94,6 +94,8 @@ struct STsdbRepo {
pthread_mutex_t mutex;
bool repoLocked;
int32_t code; // Commit code
int64_t commitWalSize; // last commit wal size. MB
int64_t walSize; // current wal size. MB
SMergeBuf mergeBuf; //used when update=2
int8_t compactState; // compact state: inCompact/noCompact/waitingCompact?
......@@ -109,7 +111,7 @@ struct STsdbRepo {
int tsdbLockRepo(STsdbRepo* pRepo);
int tsdbUnlockRepo(STsdbRepo* pRepo);
STsdbMeta* tsdbGetMeta(STsdbRepo* pRepo);
int tsdbCheckCommit(STsdbRepo* pRepo);
int tsdbCheckCommit(STsdbRepo* pRepo, bool checkWal);
int tsdbRestoreInfo(STsdbRepo* pRepo);
int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg);
void tsdbGetRootDir(int repoid, char dirName[]);
......
......@@ -185,7 +185,14 @@ int tsdbUnlockRepo(STsdbRepo *pRepo) {
return 0;
}
int tsdbCheckCommit(STsdbRepo *pRepo) {
void tsdbSetWalSize(STsdbRepo *pRepo, int64_t walSize) { // MB
atomic_store_64(&pRepo->walSize,walSize);
if(walSize <= atomic_load_64(&pRepo->commitWalSize)) {
atomic_store_64(&pRepo->commitWalSize, 0); // restart
}
}
int tsdbCheckCommit(STsdbRepo *pRepo, bool checkWal) {
ASSERT(pRepo->mem != NULL);
STsdbCfg *pCfg = &(pRepo->config);
......@@ -195,6 +202,14 @@ int tsdbCheckCommit(STsdbRepo *pRepo) {
((listNEles(pRepo->mem->bufBlockList) >= pCfg->totalBlocks / 3) && (pBufBlock->remain < TSDB_BUFFER_RESERVE))) {
// trigger commit
if (tsdbAsyncCommit(pRepo) < 0) return -1;
} else if (checkWal) {
int64_t walSize = atomic_load_64(&pRepo->walSize);
int64_t commitWalSize = atomic_load_64(&pRepo->commitWalSize);
int64_t delta = walSize - commitWalSize;
if ((delta > tsdbWalFlushSize) && (delta > (pCfg->totalBlocks / 2 * pCfg->cacheBlockSize))) {
atomic_store_64(&pRepo->commitWalSize, walSize);
if (tsdbAsyncCommit(pRepo) < 0) return -1;
}
}
return 0;
......
......@@ -77,7 +77,7 @@ int32_t tsdbInsertData(STsdbRepo *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pR
if (pRsp != NULL) pRsp->affectedRows = htonl(affectedrows);
if (tsdbCheckCommit(pRepo) < 0) return -1;
if (tsdbCheckCommit(pRepo, false) < 0) return -1;
return 0;
}
......
......@@ -140,7 +140,7 @@ int tsdbCreateTable(STsdbRepo *repo, STableCfg *pCfg) {
goto _err;
}
if (tsdbCheckCommit(pRepo) < 0) return -1;
if (tsdbCheckCommit(pRepo, true) < 0) return -1;
return 0;
......@@ -191,7 +191,7 @@ int tsdbDropTable(STsdbRepo *repo, STableId tableId) {
tsdbDebug("vgId:%d, table %s is dropped! tid:%d, uid:%" PRId64, pRepo->config.tsdbId, tbname, tid, uid);
free(tbname);
if (tsdbCheckCommit(pRepo) < 0) goto _err;
if (tsdbCheckCommit(pRepo, false) < 0) goto _err;
return 0;
......@@ -416,7 +416,7 @@ int tsdbUpdateTableTagValue(STsdbRepo *repo, SUpdateTableTagValMsg *pMsg) {
}
tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, buf, pTable);
if (tsdbCheckCommit(pRepo) < 0) return -1;
if (tsdbCheckCommit(pRepo, false) < 0) return -1;
return 0;
}
......
......@@ -37,6 +37,7 @@ int32_t tfFsync(int64_t tfd);
bool tfValid(int64_t tfd);
int64_t tfLseek(int64_t tfd, int64_t offset, int32_t whence);
int32_t tfFtruncate(int64_t tfd, int64_t length);
int32_t tfStat(int64_t tfd, struct stat *pFstat);
#ifdef __cplusplus
}
......
......@@ -133,3 +133,14 @@ int32_t tfFtruncate(int64_t tfd, int64_t length) {
taosReleaseRef(tsFileRsetId, tfd);
return code;
}
int32_t tfStat(int64_t tfd, struct stat *pFstat) {
void *p = taosAcquireRef(tsFileRsetId, tfd);
if (p == NULL) return -1;
int32_t fd = (int32_t)(uintptr_t)p;
int32_t code = fstat(fd, pFstat);
taosReleaseRef(tsFileRsetId, tfd);
return code;
}
......@@ -176,6 +176,10 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe
return terrno;
}
if ((pVnode->version & 8191) == 0) {
tsdbSetWalSize(pVnode->tsdb, walGetFSize(pVnode->wal) >> 20);
}
if (tsdbCreateTable(pVnode->tsdb, pCfg) < 0) {
code = terrno;
ASSERT(code != 0);
......
......@@ -576,4 +576,14 @@ void walResetVersion(twalh param, uint64_t newVer) {
wInfo("vgId:%d, version reset from %" PRIu64 " to %" PRIu64, pWal->vgId, pWal->version, newVer);
pWal->version = newVer;
}
int64_t walGetFSize(twalh handle) {
SWal *pWal = handle;
if (pWal == NULL) return 0;
struct stat _fstat;
if (tfStat(pWal->tfd, &_fstat) == 0) {
return _fstat.st_size;
};
return 0;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册