未验证 提交 bea6d6f1 编写于 作者: H Hongze Cheng 提交者: GitHub

Merge pull request #8130 from taosdata/enhance/TS-385

Enhance/ts 385
...@@ -301,3 +301,6 @@ keepColumnName 1 ...@@ -301,3 +301,6 @@ keepColumnName 1
# force TCP transmission # force TCP transmission
# rpcForceTcp 0 # rpcForceTcp 0
# unit MB. Flush vnode wal file if walSize > walFlushSize and walSize > cache*0.5*blocks
# walFlushSize 1024
...@@ -108,9 +108,10 @@ extern int32_t tsQuorum; ...@@ -108,9 +108,10 @@ extern int32_t tsQuorum;
extern int8_t tsUpdate; extern int8_t tsUpdate;
extern int8_t tsCacheLastRow; extern int8_t tsCacheLastRow;
//tsdb //tsdb
extern bool tsdbForceKeepFile; extern bool tsdbForceKeepFile;
extern bool tsdbForceCompactFile; extern bool tsdbForceCompactFile;
extern int32_t tsdbWalFlushSize;
// balance // balance
extern int8_t tsEnableBalance; extern int8_t tsEnableBalance;
......
...@@ -155,8 +155,9 @@ int32_t tsTsdbMetaCompactRatio = TSDB_META_COMPACT_RATIO; ...@@ -155,8 +155,9 @@ int32_t tsTsdbMetaCompactRatio = TSDB_META_COMPACT_RATIO;
// tsdb config // tsdb config
// For backward compatibility // For backward compatibility
bool tsdbForceKeepFile = false; bool tsdbForceKeepFile = false;
bool tsdbForceCompactFile = false; // compact TSDB fileset forcibly bool tsdbForceCompactFile = false; // compact TSDB fileset forcibly
int32_t tsdbWalFlushSize = TSDB_DEFAULT_WAL_FLUSH_SIZE; // MB
// balance // balance
int8_t tsEnableBalance = 1; int8_t tsEnableBalance = 1;
...@@ -1652,6 +1653,17 @@ static void doInitGlobalConfig(void) { ...@@ -1652,6 +1653,17 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_NONE; cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
// flush vnode wal file if walSize > walFlushSize and walSize > cache*0.5*blocks
cfg.option = "walFlushSize";
cfg.ptr = &tsdbWalFlushSize;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT;
cfg.minValue = TSDB_MIN_WAL_FLUSH_SIZE;
cfg.maxValue = TSDB_MAX_WAL_FLUSH_SIZE;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_MB;
taosInitConfigOption(cfg);
#ifdef TD_TSZ #ifdef TD_TSZ
// lossy compress // lossy compress
cfg.option = "lossyColumns"; cfg.option = "lossyColumns";
......
...@@ -279,6 +279,10 @@ do { \ ...@@ -279,6 +279,10 @@ do { \
#define TSDB_MAX_TOTAL_BLOCKS 10000 #define TSDB_MAX_TOTAL_BLOCKS 10000
#define TSDB_DEFAULT_TOTAL_BLOCKS 6 #define TSDB_DEFAULT_TOTAL_BLOCKS 6
#define TSDB_MIN_WAL_FLUSH_SIZE 128 // MB
#define TSDB_MAX_WAL_FLUSH_SIZE 10000000 // MB
#define TSDB_DEFAULT_WAL_FLUSH_SIZE 1024 // MB
#define TSDB_MIN_TABLES 4 #define TSDB_MIN_TABLES 4
#define TSDB_MAX_TABLES 10000000 #define TSDB_MAX_TABLES 10000000
#define TSDB_DEFAULT_TABLES 1000000 #define TSDB_DEFAULT_TABLES 1000000
......
...@@ -418,6 +418,12 @@ int tsdbCompact(STsdbRepo *pRepo); ...@@ -418,6 +418,12 @@ int tsdbCompact(STsdbRepo *pRepo);
// no problem return true // no problem return true
bool tsdbNoProblem(STsdbRepo* pRepo); bool tsdbNoProblem(STsdbRepo* pRepo);
// unit of walSize: MB
int tsdbCheckWal(STsdbRepo *pRepo, uint32_t walSize);
// not commit if other instances in committing state or waiting to commit
bool tsdbIsNeedCommit(STsdbRepo *pRepo);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -66,6 +66,7 @@ int32_t walRestore(twalh, void *pVnode, FWalWrite writeFp); ...@@ -66,6 +66,7 @@ int32_t walRestore(twalh, void *pVnode, FWalWrite writeFp);
int32_t walGetWalFile(twalh, char *fileName, int64_t *fileId); int32_t walGetWalFile(twalh, char *fileName, int64_t *fileId);
uint64_t walGetVersion(twalh); uint64_t walGetVersion(twalh);
void walResetVersion(twalh, uint64_t newVer); void walResetVersion(twalh, uint64_t newVer);
int64_t walGetFSize(twalh);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -185,6 +185,23 @@ int tsdbUnlockRepo(STsdbRepo *pRepo) { ...@@ -185,6 +185,23 @@ int tsdbUnlockRepo(STsdbRepo *pRepo) {
return 0; return 0;
} }
bool tsdbIsNeedCommit(STsdbRepo *pRepo) {
int nVal = 0;
if (sem_getvalue(&pRepo->readyToCommit, &nVal) != 0) {
tsdbError("vgId:%d failed to sem_getvalue of readyToCommit", REPO_ID(pRepo));
return false;
}
return nVal > 0;
}
int tsdbCheckWal(STsdbRepo *pRepo, uint32_t walSize) { // MB
STsdbCfg *pCfg = &(pRepo->config);
if ((walSize > tsdbWalFlushSize) && (walSize > (pCfg->totalBlocks / 2 * pCfg->cacheBlockSize))) {
if (tsdbIsNeedCommit(pRepo) && (tsdbAsyncCommit(pRepo) < 0)) return -1;
}
return 0;
}
int tsdbCheckCommit(STsdbRepo *pRepo) { int tsdbCheckCommit(STsdbRepo *pRepo) {
ASSERT(pRepo->mem != NULL); ASSERT(pRepo->mem != NULL);
STsdbCfg *pCfg = &(pRepo->config); STsdbCfg *pCfg = &(pRepo->config);
...@@ -194,9 +211,8 @@ int tsdbCheckCommit(STsdbRepo *pRepo) { ...@@ -194,9 +211,8 @@ int tsdbCheckCommit(STsdbRepo *pRepo) {
if ((pRepo->mem->extraBuffList != NULL) || if ((pRepo->mem->extraBuffList != NULL) ||
((listNEles(pRepo->mem->bufBlockList) >= pCfg->totalBlocks / 3) && (pBufBlock->remain < TSDB_BUFFER_RESERVE))) { ((listNEles(pRepo->mem->bufBlockList) >= pCfg->totalBlocks / 3) && (pBufBlock->remain < TSDB_BUFFER_RESERVE))) {
// trigger commit // trigger commit
if (tsdbAsyncCommit(pRepo) < 0) return -1; if (tsdbIsNeedCommit(pRepo) && (tsdbAsyncCommit(pRepo) < 0)) return -1;
} }
return 0; return 0;
} }
......
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
extern "C" { extern "C" {
#endif #endif
#define TSDB_CFG_MAX_NUM 124 #define TSDB_CFG_MAX_NUM 125
#define TSDB_CFG_PRINT_LEN 23 #define TSDB_CFG_PRINT_LEN 23
#define TSDB_CFG_OPTION_LEN 24 #define TSDB_CFG_OPTION_LEN 24
#define TSDB_CFG_VALUE_LEN 41 #define TSDB_CFG_VALUE_LEN 41
......
...@@ -37,6 +37,7 @@ int32_t tfFsync(int64_t tfd); ...@@ -37,6 +37,7 @@ int32_t tfFsync(int64_t tfd);
bool tfValid(int64_t tfd); bool tfValid(int64_t tfd);
int64_t tfLseek(int64_t tfd, int64_t offset, int32_t whence); int64_t tfLseek(int64_t tfd, int64_t offset, int32_t whence);
int32_t tfFtruncate(int64_t tfd, int64_t length); int32_t tfFtruncate(int64_t tfd, int64_t length);
int32_t tfStat(int64_t tfd, struct stat *pFstat);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -133,3 +133,14 @@ int32_t tfFtruncate(int64_t tfd, int64_t length) { ...@@ -133,3 +133,14 @@ int32_t tfFtruncate(int64_t tfd, int64_t length) {
taosReleaseRef(tsFileRsetId, tfd); taosReleaseRef(tsFileRsetId, tfd);
return code; return code;
} }
int32_t tfStat(int64_t tfd, struct stat *pFstat) {
void *p = taosAcquireRef(tsFileRsetId, tfd);
if (p == NULL) return -1;
int32_t fd = (int32_t)(uintptr_t)p;
int32_t code = fstat(fd, pFstat);
taosReleaseRef(tsFileRsetId, tfd);
return code;
}
...@@ -56,6 +56,7 @@ typedef struct { ...@@ -56,6 +56,7 @@ typedef struct {
uint64_t version; // current version uint64_t version; // current version
uint64_t cversion; // version while commit start uint64_t cversion; // version while commit start
uint64_t fversion; // version on saved data file uint64_t fversion; // version on saved data file
uint32_t tblMsgVer; // create table msg version
void * wqueue; // write queue void * wqueue; // write queue
void * qqueue; // read query queue void * qqueue; // read query queue
void * fqueue; // read fetch/cancel queue void * fqueue; // read fetch/cancel queue
......
...@@ -36,6 +36,7 @@ static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet ...@@ -36,6 +36,7 @@ static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet
static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *); static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *);
static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspRet *); static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspRet *);
static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite); static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite);
static int32_t vnodeCheckWal(SVnodeObj *pVnode);
int32_t vnodeInitWrite(void) { int32_t vnodeInitWrite(void) {
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessSubmitMsg; vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessSubmitMsg;
...@@ -167,6 +168,13 @@ static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pR ...@@ -167,6 +168,13 @@ static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pR
return code; return code;
} }
static int32_t vnodeCheckWal(SVnodeObj *pVnode) {
if (tsdbIsNeedCommit(pVnode->tsdb)) {
return tsdbCheckWal(pVnode->tsdb, walGetFSize(pVnode->wal) >> 20);
}
return 0;
}
static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) { static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
int code = TSDB_CODE_SUCCESS; int code = TSDB_CODE_SUCCESS;
...@@ -181,6 +189,10 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe ...@@ -181,6 +189,10 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe
ASSERT(code != 0); ASSERT(code != 0);
} }
if (((++pVnode->tblMsgVer) & 16383) == 0) { // lazy check
vnodeCheckWal(pVnode);
}
tsdbClearTableCfg(pCfg); tsdbClearTableCfg(pCfg);
return code; return code;
} }
......
...@@ -576,4 +576,14 @@ void walResetVersion(twalh param, uint64_t newVer) { ...@@ -576,4 +576,14 @@ void walResetVersion(twalh param, uint64_t newVer) {
wInfo("vgId:%d, version reset from %" PRIu64 " to %" PRIu64, pWal->vgId, pWal->version, newVer); wInfo("vgId:%d, version reset from %" PRIu64 " to %" PRIu64, pWal->vgId, pWal->version, newVer);
pWal->version = newVer; pWal->version = newVer;
}
int64_t walGetFSize(twalh handle) {
SWal *pWal = handle;
if (pWal == NULL) return 0;
struct stat _fstat;
if (tfStat(pWal->tfd, &_fstat) == 0) {
return _fstat.st_size;
};
return 0;
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册