提交 efff4e77 编写于 作者: H Hongze Cheng

more code

上级 5649401a
...@@ -304,7 +304,7 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx); ...@@ -304,7 +304,7 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx);
int32_t tsdbTakeReadSnap(STsdbReader *pReader, _query_reseek_func_t reseek, STsdbReadSnap **ppSnap); int32_t tsdbTakeReadSnap(STsdbReader *pReader, _query_reseek_func_t reseek, STsdbReadSnap **ppSnap);
void tsdbUntakeReadSnap(STsdbReader *pReader, STsdbReadSnap *pSnap, bool proactive); void tsdbUntakeReadSnap(STsdbReader *pReader, STsdbReadSnap *pSnap, bool proactive);
// tsdbMerge.c ============================================================================================== // tsdbMerge.c ==============================================================================================
int32_t tsdbMerge(STsdb *pTsdb); int32_t tsdbMerge(void *arg);
// tsdbDiskData ============================================================================================== // tsdbDiskData ==============================================================================================
int32_t tDiskDataBuilderCreate(SDiskDataBuilder **ppBuilder); int32_t tDiskDataBuilderCreate(SDiskDataBuilder **ppBuilder);
......
...@@ -53,6 +53,7 @@ struct STFileSystem { ...@@ -53,6 +53,7 @@ struct STFileSystem {
int32_t state; int32_t state;
int64_t neid; int64_t neid;
EFEditT etype; EFEditT etype;
bool mergeTaskOn;
TFileSetArray fSetArr[1]; TFileSetArray fSetArr[1];
TFileSetArray fSetArrTmp[1]; TFileSetArray fSetArrTmp[1];
}; };
......
...@@ -473,10 +473,6 @@ int32_t tsdbCommitCommit(STsdb *tsdb) { ...@@ -473,10 +473,6 @@ int32_t tsdbCommitCommit(STsdb *tsdb) {
taosThreadRwlockUnlock(&tsdb->rwLock); taosThreadRwlockUnlock(&tsdb->rwLock);
tsdbUnrefMemTable(pMemTable, NULL, true); tsdbUnrefMemTable(pMemTable, NULL, true);
// TODO: make this call async
code = tsdbMerge(tsdb);
TSDB_CHECK_CODE(code, lino, _exit);
_exit: _exit:
if (code) { if (code) {
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
......
...@@ -15,6 +15,8 @@ ...@@ -15,6 +15,8 @@
#include "inc/tsdbFS.h" #include "inc/tsdbFS.h"
extern int vnodeScheduleTask(int (*execute)(void *), void *arg);
#define TSDB_FS_EDIT_MIN TSDB_FEDIT_COMMIT #define TSDB_FS_EDIT_MIN TSDB_FEDIT_COMMIT
#define TSDB_FS_EDIT_MAX (TSDB_FEDIT_MERGE + 1) #define TSDB_FS_EDIT_MAX (TSDB_FEDIT_MERGE + 1)
...@@ -45,6 +47,7 @@ static int32_t create_fs(STsdb *pTsdb, STFileSystem **fs) { ...@@ -45,6 +47,7 @@ static int32_t create_fs(STsdb *pTsdb, STFileSystem **fs) {
tsem_init(&fs[0]->canEdit, 0, 1); tsem_init(&fs[0]->canEdit, 0, 1);
fs[0]->state = TSDB_FS_STATE_NONE; fs[0]->state = TSDB_FS_STATE_NONE;
fs[0]->neid = 0; fs[0]->neid = 0;
fs[0]->mergeTaskOn = false;
TARRAY2_INIT(fs[0]->fSetArr); TARRAY2_INIT(fs[0]->fSetArr);
TARRAY2_INIT(fs[0]->fSetArrTmp); TARRAY2_INIT(fs[0]->fSetArrTmp);
...@@ -584,8 +587,46 @@ _exit: ...@@ -584,8 +587,46 @@ _exit:
} }
int32_t tsdbFSEditCommit(STFileSystem *fs) { int32_t tsdbFSEditCommit(STFileSystem *fs) {
int32_t code = commit_edit(fs); int32_t code = 0;
tsem_post(&fs->canEdit); int32_t lino = 0;
// commit
code = commit_edit(fs);
TSDB_CHECK_CODE(code, lino, _exit);
if (fs->etype == TSDB_FEDIT_MERGE) {
ASSERT(fs->mergeTaskOn);
fs->mergeTaskOn = false;
}
// check if need to merge
if (fs->mergeTaskOn == false) {
STFileSet *fset;
TARRAY2_FOREACH_REVERSE(fs->fSetArr, fset) {
if (TARRAY2_SIZE(fset->lvlArr) == 0) continue;
SSttLvl *lvl0 = TARRAY2_FIRST(fset->lvlArr);
if (lvl0->level != 0 || TARRAY2_SIZE(lvl0->fobjArr) == 0) continue;
STFileObj *fobj = TARRAY2_FIRST(lvl0->fobjArr);
if (fobj->f->stt->nseg < fs->tsdb->pVnode->config.sttTrigger) continue;
code = vnodeScheduleTask(tsdbMerge, fs->tsdb);
TSDB_CHECK_CODE(code, lino, _exit);
fs->mergeTaskOn = true;
break;
}
}
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code);
} else {
tsem_post(&fs->canEdit);
}
return code; return code;
} }
......
...@@ -611,10 +611,10 @@ _exit: ...@@ -611,10 +611,10 @@ _exit:
return code; return code;
} }
int32_t tsdbMerge(STsdb *tsdb) { int32_t tsdbMerge(void *arg) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
int32_t vid = TD_VID(tsdb->pVnode); STsdb *tsdb = (STsdb *)arg;
SMerger merger[1] = {{ SMerger merger[1] = {{
.tsdb = tsdb, .tsdb = tsdb,
...@@ -631,9 +631,9 @@ int32_t tsdbMerge(STsdb *tsdb) { ...@@ -631,9 +631,9 @@ int32_t tsdbMerge(STsdb *tsdb) {
_exit: _exit:
if (code) { if (code) {
TSDB_ERROR_LOG(vid, lino, code); TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
} else if (merger->ctx->opened) { } else if (merger->ctx->opened) {
tsdbDebug("vgId:%d %s done", vid, __func__); tsdbDebug("vgId:%d %s done", TD_VID(tsdb->pVnode), __func__);
} }
return code; return code;
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册