diff --git a/include/util/tarray2.h b/include/util/tarray2.h index 8ac9050fbaeea76743b4b1554e4783001c97a271..29182119d9bc002c52951b52bf94727440b3bf26 100644 --- a/include/util/tarray2.h +++ b/include/util/tarray2.h @@ -75,6 +75,9 @@ static FORCE_INLINE int32_t tarray2_make_room( // #define TARRAY2_INIT(a) TARRAY2_INIT_EX(a, 0, 0, NULL) +#define TARRAY2_INITIALIZER \ + { 0, 0, NULL } + #define TARRAY2_FREE(a) \ do { \ if ((a)->data) { \ diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbFS.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbFS.h index dff8678e7deb46c06328939c88e7e68a8c4ac65c..49bee4f9cb66b9196b76806f1e45f75bd5b0413d 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbFS.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbFS.h @@ -35,6 +35,9 @@ typedef enum { // open/close int32_t tsdbOpenFS(STsdb *pTsdb, STFileSystem **fs, int8_t rollback); int32_t tsdbCloseFS(STFileSystem **fs); +// snapshot +int32_t tsdbFSCopySnapshot(STFileSystem *fs, TFileSetArray *fsetArr); +int32_t tsdbFSClearSnapshot(TFileSetArray *fsetArr); // txn int32_t tsdbFSAllocEid(STFileSystem *pFS, int64_t *eid); int32_t tsdbFSEditBegin(STFileSystem *fs, const TFileOpArray *opArray, EFEditT etype); @@ -45,7 +48,7 @@ int32_t tsdbFSGetFSet(STFileSystem *fs, int32_t fid, STFileSet **fset); /* Exposed Structs */ struct STFileSystem { - STsdb *pTsdb; + STsdb *tsdb; tsem_t canEdit; int32_t state; int64_t neid; diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbFSet.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbFSet.h index ace5ebf17ac49c802c46532336868620618ce894..acb83deeceb52d8bf5531e1d676e1dac01b5e66d 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbFSet.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbFSet.h @@ -65,13 +65,13 @@ struct STFileOp { struct SSttLvl { int32_t level; - TFileObjArray farr; + TFileObjArray fobjArr[1]; }; struct STFileSet { int32_t fid; STFileObj *farr[TSDB_FTYPE_MAX]; // file array - TSttLvlArray lvlArr; // level array + TSttLvlArray lvlArr[1]; // level array }; #ifdef __cplusplus diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFileRW.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFileRW.h index 6bb116c188c9a07e01c227274d4ca36d6acb35a0..bfad582fc2c2571631168ff4478cce9a4250a02b 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFileRW.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFileRW.h @@ -31,6 +31,7 @@ typedef TARRAY2(STbStatisBlk) TStatisBlkArray; typedef struct SSttFileReader SSttFileReader; typedef struct SSttFileReaderConfig SSttFileReaderConfig; typedef struct SSttSegReader SSttSegReader; +typedef TARRAY2(SSttFileReader *) TSttFileReaderArray; typedef TARRAY2(SSttSegReader *) TSttSegReaderArray; // SSttFileReader diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c b/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c index 536882d6978b32c01a79785cd6e9fcde9e877a64..4de9337eb8d7aa86b2aa4c7e2f13a73278316007 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c @@ -128,8 +128,8 @@ static int32_t tsdbCommitOpenWriter(SCommitter2 *committer) { return tsdbCommitOpenNewSttWriter(committer); } - ASSERT(TARRAY2_SIZE(&lvl0->farr) > 0); - STFileObj *fobj = TARRAY2_LAST(&lvl0->farr); + ASSERT(TARRAY2_SIZE(lvl0->fobjArr) > 0); + STFileObj *fobj = TARRAY2_LAST(lvl0->fobjArr); if (fobj->f->stt->nseg >= committer->sttTrigger) { return tsdbCommitOpenNewSttWriter(committer); } else { diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbFS.c b/source/dnode/vnode/src/tsdb/dev/tsdbFS.c index f70cb73c7e8ab922eb5cb21afe5417179659e2e9..2345bccc4a55049810f7099f555d97f7f535f310 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbFS.c @@ -41,7 +41,7 @@ static int32_t create_fs(STsdb *pTsdb, STFileSystem **fs) { fs[0] = taosMemoryCalloc(1, sizeof(*fs[0])); if (fs[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY; - fs[0]->pTsdb = pTsdb; + fs[0]->tsdb = pTsdb; tsem_init(&fs[0]->canEdit, 0, 1); fs[0]->state = TSDB_FS_STATE_NONE; fs[0]->neid = 0; @@ -256,7 +256,7 @@ static int32_t apply_commit(STFileSystem *fs) { TARRAY2_REMOVE(fsetArray1, i1, tsdbTFileSetRemove); } else if (fset1->fid > fset2->fid) { // create new file set with fid of fset2->fid - code = tsdbTFileSetInitEx(fs->pTsdb, fset2, &fset1); + code = tsdbTFileSetInitEx(fs->tsdb, fset2, &fset1); if (code) return code; code = TARRAY2_SORT_INSERT(fsetArray1, fset1, tsdbTFileSetCmprFn); if (code) return code; @@ -264,7 +264,7 @@ static int32_t apply_commit(STFileSystem *fs) { i2++; } else { // edit - code = tsdbTFileSetApplyEdit(fs->pTsdb, fset2, fset1); + code = tsdbTFileSetApplyEdit(fs->tsdb, fset2, fset1); if (code) return code; i1++; i2++; @@ -274,7 +274,7 @@ static int32_t apply_commit(STFileSystem *fs) { TARRAY2_REMOVE(fsetArray1, i1, tsdbTFileSetRemove); } else { // create new file set with fid of fset2->fid - code = tsdbTFileSetInitEx(fs->pTsdb, fset2, &fset1); + code = tsdbTFileSetInitEx(fs->tsdb, fset2, &fset1); if (code) return code; code = TARRAY2_SORT_INSERT(fsetArray1, fset1, tsdbTFileSetCmprFn); if (code) return code; @@ -290,11 +290,11 @@ static int32_t commit_edit(STFileSystem *fs) { char current[TSDB_FILENAME_LEN]; char current_t[TSDB_FILENAME_LEN]; - current_fname(fs->pTsdb, current, TSDB_FCURRENT); + current_fname(fs->tsdb, current, TSDB_FCURRENT); if (fs->etype == TSDB_FEDIT_COMMIT) { - current_fname(fs->pTsdb, current_t, TSDB_FCURRENT_C); + current_fname(fs->tsdb, current_t, TSDB_FCURRENT_C); } else if (fs->etype == TSDB_FEDIT_MERGE) { - current_fname(fs->pTsdb, current_t, TSDB_FCURRENT_M); + current_fname(fs->tsdb, current_t, TSDB_FCURRENT_M); } else { ASSERT(0); } @@ -310,9 +310,9 @@ static int32_t commit_edit(STFileSystem *fs) { _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(fs->pTsdb->pVnode), __func__, lino, tstrerror(code)); + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(fs->tsdb->pVnode), __func__, lino, tstrerror(code)); } else { - tsdbInfo("vgId:%d %s success, etype:%d", TD_VID(fs->pTsdb->pVnode), __func__, fs->etype); + tsdbInfo("vgId:%d %s success, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, fs->etype); } return code; } @@ -327,9 +327,9 @@ static int32_t abort_edit(STFileSystem *fs) { char fname[TSDB_FILENAME_LEN]; if (fs->etype == TSDB_FEDIT_COMMIT) { - current_fname(fs->pTsdb, fname, TSDB_FCURRENT_C); + current_fname(fs->tsdb, fname, TSDB_FCURRENT_C); } else if (fs->etype == TSDB_FEDIT_MERGE) { - current_fname(fs->pTsdb, fname, TSDB_FCURRENT_M); + current_fname(fs->tsdb, fname, TSDB_FCURRENT_M); } else { ASSERT(0); } @@ -345,9 +345,9 @@ static int32_t abort_edit(STFileSystem *fs) { _exit: if (code) { - tsdbError("vgId:%d %s failed since %s", TD_VID(fs->pTsdb->pVnode), __func__, tstrerror(code)); + tsdbError("vgId:%d %s failed since %s", TD_VID(fs->tsdb->pVnode), __func__, tstrerror(code)); } else { - tsdbInfo("vgId:%d %s success, etype:%d", TD_VID(fs->pTsdb->pVnode), __func__, fs->etype); + tsdbInfo("vgId:%d %s success, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, fs->etype); } return code; } @@ -379,7 +379,7 @@ static int32_t tsdbFSDupState(STFileSystem *fs) { const STFileSet *fset1; TARRAY2_FOREACH(src, fset1) { STFileSet *fset2; - code = tsdbTFileSetInitEx(fs->pTsdb, fset1, &fset2); + code = tsdbTFileSetInitEx(fs->tsdb, fset1, &fset2); if (code) return code; code = TARRAY2_APPEND(dst, fset2); if (code) return code; @@ -391,7 +391,7 @@ static int32_t tsdbFSDupState(STFileSystem *fs) { static int32_t open_fs(STFileSystem *fs, int8_t rollback) { int32_t code = 0; int32_t lino = 0; - STsdb *pTsdb = fs->pTsdb; + STsdb *pTsdb = fs->tsdb; code = update_fs_if_needed(fs); TSDB_CHECK_CODE(code, lino, _exit); @@ -492,7 +492,7 @@ static int32_t edit_fs(STFileSystem *fs, const TFileOpArray *opArray) { } } - code = tsdbTFileSetEdit(fs->pTsdb, fset, op); + code = tsdbTFileSetEdit(fs->tsdb, fset, op); TSDB_CHECK_CODE(code, lino, _exit); } @@ -550,10 +550,10 @@ int32_t tsdbFSEditBegin(STFileSystem *fs, const TFileOpArray *opArray, EFEditT e switch (etype) { case TSDB_FEDIT_COMMIT: - current_fname(fs->pTsdb, current_t, TSDB_FCURRENT_C); + current_fname(fs->tsdb, current_t, TSDB_FCURRENT_C); break; case TSDB_FEDIT_MERGE: - current_fname(fs->pTsdb, current_t, TSDB_FCURRENT_M); + current_fname(fs->tsdb, current_t, TSDB_FCURRENT_M); break; default: ASSERT(0); @@ -573,10 +573,10 @@ int32_t tsdbFSEditBegin(STFileSystem *fs, const TFileOpArray *opArray, EFEditT e _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s, etype:%d", TD_VID(fs->pTsdb->pVnode), __func__, lino, + tsdbError("vgId:%d %s failed at line %d since %s, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, lino, tstrerror(code), etype); } else { - tsdbInfo("vgId:%d %s done, etype:%d", TD_VID(fs->pTsdb->pVnode), __func__, etype); + tsdbInfo("vgId:%d %s done, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, etype); } return code; } @@ -598,4 +598,32 @@ int32_t tsdbFSGetFSet(STFileSystem *fs, int32_t fid, STFileSet **fset) { STFileSet *pset = &tfset; fset[0] = TARRAY2_SEARCH_EX(&fs->cstate, &pset, tsdbTFileSetCmprFn, TD_EQ); return 0; +} + +int32_t tsdbFSCopySnapshot(STFileSystem *fs, TFileSetArray *fsetArr) { + int32_t code = 0; + STFileSet *fset; + STFileSet *fset1; + + ASSERT(TARRAY2_SIZE(fsetArr) == 0); + + taosThreadRwlockRdlock(&fs->tsdb->rwLock); + TARRAY2_FOREACH(&fs->cstate, fset) { + code = tsdbTFileSetInitEx(fs->tsdb, fset, &fset1); + if (code) break; + + code = TARRAY2_APPEND(fsetArr, fset1); + if (code) break; + } + taosThreadRwlockUnlock(&fs->tsdb->rwLock); + + if (code) { + TARRAY2_CLEAR(fsetArr, tsdbTFileSetClear); + } + return code; +} + +int32_t tsdbFSClearSnapshot(TFileSetArray *fsetArr) { + TARRAY2_CLEAR(fsetArr, tsdbTFileSetClear); + return 0; } \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbFSet.c b/source/dnode/vnode/src/tsdb/dev/tsdbFSet.c index d7edd72d2214b3af19932782a31194cfec123917..3a1cf659bbd50190da78802a30f54f478ba37a8f 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbFSet.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbFSet.c @@ -18,13 +18,13 @@ static int32_t tsdbSttLvlInit(int32_t level, SSttLvl **lvl) { if (!(lvl[0] = taosMemoryMalloc(sizeof(SSttLvl)))) return TSDB_CODE_OUT_OF_MEMORY; lvl[0]->level = level; - TARRAY2_INIT(&lvl[0]->farr); + TARRAY2_INIT(lvl[0]->fobjArr); return 0; } static void tsdbSttLvlClearFObj(void *data) { tsdbTFileObjUnref(*(STFileObj **)data); } static int32_t tsdbSttLvlClear(SSttLvl **lvl) { - TARRAY2_CLEAR_FREE(&lvl[0]->farr, tsdbSttLvlClearFObj); + TARRAY2_CLEAR_FREE(lvl[0]->fobjArr, tsdbSttLvlClearFObj); taosMemoryFree(lvl[0]); lvl[0] = NULL; return 0; @@ -35,7 +35,7 @@ static int32_t tsdbSttLvlInitEx(STsdb *pTsdb, const SSttLvl *lvl1, SSttLvl **lvl if (code) return code; const STFileObj *fobj1; - TARRAY2_FOREACH(&lvl1->farr, fobj1) { + TARRAY2_FOREACH(lvl1->fobjArr, fobj1) { STFileObj *fobj; code = tsdbTFileObjInit(pTsdb, fobj1->f, &fobj); if (code) { @@ -43,14 +43,14 @@ static int32_t tsdbSttLvlInitEx(STsdb *pTsdb, const SSttLvl *lvl1, SSttLvl **lvl return code; } - TARRAY2_APPEND(&lvl[0]->farr, fobj); + TARRAY2_APPEND(lvl[0]->fobjArr, fobj); } return 0; } static void tsdbSttLvlRemoveFObj(void *data) { tsdbTFileObjRemove(*(STFileObj **)data); } static void tsdbSttLvlRemove(SSttLvl **lvl) { - TARRAY2_CLEAR_FREE(&lvl[0]->farr, tsdbSttLvlRemoveFObj); + TARRAY2_CLEAR_FREE(lvl[0]->fobjArr, tsdbSttLvlRemoveFObj); taosMemoryFree(lvl[0]); lvl[0] = NULL; } @@ -61,32 +61,32 @@ static int32_t tsdbSttLvlApplyEdit(STsdb *pTsdb, const SSttLvl *lvl1, SSttLvl *l ASSERT(lvl1->level == lvl2->level); int32_t i1 = 0, i2 = 0; - while (i1 < TARRAY2_SIZE(&lvl1->farr) || i2 < TARRAY2_SIZE(&lvl2->farr)) { - STFileObj *fobj1 = i1 < TARRAY2_SIZE(&lvl1->farr) ? TARRAY2_GET(&lvl1->farr, i1) : NULL; - STFileObj *fobj2 = i2 < TARRAY2_SIZE(&lvl2->farr) ? TARRAY2_GET(&lvl2->farr, i2) : NULL; + while (i1 < TARRAY2_SIZE(lvl1->fobjArr) || i2 < TARRAY2_SIZE(lvl2->fobjArr)) { + STFileObj *fobj1 = i1 < TARRAY2_SIZE(lvl1->fobjArr) ? TARRAY2_GET(lvl1->fobjArr, i1) : NULL; + STFileObj *fobj2 = i2 < TARRAY2_SIZE(lvl2->fobjArr) ? TARRAY2_GET(lvl2->fobjArr, i2) : NULL; if (fobj1 && fobj2) { if (fobj1->f->cid < fobj2->f->cid) { // create a file obj code = tsdbTFileObjInit(pTsdb, fobj1->f, &fobj2); if (code) return code; - code = TARRAY2_APPEND(&lvl2->farr, fobj2); + code = TARRAY2_APPEND(lvl2->fobjArr, fobj2); if (code) return code; i1++; i2++; } else if (fobj1->f->cid > fobj2->f->cid) { // remove a file obj - TARRAY2_REMOVE(&lvl2->farr, i2, tsdbSttLvlRemoveFObj); + TARRAY2_REMOVE(lvl2->fobjArr, i2, tsdbSttLvlRemoveFObj); } else { if (tsdbIsSameTFile(fobj1->f, fobj2->f)) { if (tsdbIsTFileChanged(fobj1->f, fobj2->f)) { fobj2->f[0] = fobj1->f[0]; } } else { - TARRAY2_REMOVE(&lvl2->farr, i2, tsdbSttLvlRemoveFObj); + TARRAY2_REMOVE(lvl2->fobjArr, i2, tsdbSttLvlRemoveFObj); code = tsdbTFileObjInit(pTsdb, fobj1->f, &fobj2); if (code) return code; - code = TARRAY2_SORT_INSERT(&lvl2->farr, fobj2, tsdbTFileObjCmpr); + code = TARRAY2_SORT_INSERT(lvl2->fobjArr, fobj2, tsdbTFileObjCmpr); if (code) return code; } i1++; @@ -96,13 +96,13 @@ static int32_t tsdbSttLvlApplyEdit(STsdb *pTsdb, const SSttLvl *lvl1, SSttLvl *l // create a file obj code = tsdbTFileObjInit(pTsdb, fobj1->f, &fobj2); if (code) return code; - code = TARRAY2_APPEND(&lvl2->farr, fobj2); + code = TARRAY2_APPEND(lvl2->fobjArr, fobj2); if (code) return code; i1++; i2++; } else { // remove a file obj - TARRAY2_REMOVE(&lvl2->farr, i2, tsdbSttLvlRemoveFObj); + TARRAY2_REMOVE(lvl2->fobjArr, i2, tsdbSttLvlRemoveFObj); } } return 0; @@ -122,7 +122,7 @@ static int32_t tsdbSttLvlToJson(const SSttLvl *lvl, cJSON *json) { cJSON *ajson = cJSON_AddArrayToObject(json, "files"); if (ajson == NULL) return TSDB_CODE_OUT_OF_MEMORY; const STFileObj *fobj; - TARRAY2_FOREACH(&lvl->farr, fobj) { + TARRAY2_FOREACH(lvl->fobjArr, fobj) { cJSON *item = cJSON_CreateObject(); if (item == NULL) return TSDB_CODE_OUT_OF_MEMORY; cJSON_AddItemToArray(ajson, item); @@ -169,7 +169,7 @@ static int32_t tsdbJsonToSttLvl(STsdb *pTsdb, const cJSON *json, SSttLvl **lvl) return code; } - TARRAY2_APPEND(&lvl[0]->farr, fobj); + TARRAY2_APPEND(lvl[0]->fobjArr, fobj); } return 0; } @@ -194,7 +194,7 @@ int32_t tsdbTFileSetToJson(const STFileSet *fset, cJSON *json) { item1 = cJSON_AddArrayToObject(json, "stt lvl"); if (item1 == NULL) return TSDB_CODE_OUT_OF_MEMORY; const SSttLvl *lvl; - TARRAY2_FOREACH(&fset->lvlArr, lvl) { + TARRAY2_FOREACH(fset->lvlArr, lvl) { item2 = cJSON_CreateObject(); if (!item2) return TSDB_CODE_OUT_OF_MEMORY; cJSON_AddItemToArray(item1, item2); @@ -247,7 +247,7 @@ int32_t tsdbJsonToTFileSet(STsdb *pTsdb, const cJSON *json, STFileSet **fset) { return code; } - TARRAY2_APPEND(&(*fset)->lvlArr, lvl); + TARRAY2_APPEND((*fset)->lvlArr, lvl); } } else { return TSDB_CODE_FILE_CORRUPTED; @@ -272,11 +272,11 @@ int32_t tsdbTFileSetEdit(STsdb *pTsdb, STFileSet *fset, const STFileOp *op) { code = tsdbSttLvlInit(fobj->f->stt->level, &lvl); if (code) return code; - code = TARRAY2_SORT_INSERT(&fset->lvlArr, lvl, tsdbSttLvlCmprFn); + code = TARRAY2_SORT_INSERT(fset->lvlArr, lvl, tsdbSttLvlCmprFn); if (code) return code; } - code = TARRAY2_SORT_INSERT(&lvl->farr, fobj, tsdbTFileObjCmpr); + code = TARRAY2_SORT_INSERT(lvl->fobjArr, fobj, tsdbTFileObjCmpr); if (code) return code; } else { ASSERT(fset->farr[fobj->f->type] == NULL); @@ -290,11 +290,11 @@ int32_t tsdbTFileSetEdit(STsdb *pTsdb, STFileSet *fset, const STFileOp *op) { STFileObj tfobj = {.f[0] = {.cid = op->of.cid}}; STFileObj *tfobjp = &tfobj; - int32_t idx = TARRAY2_SEARCH_IDX(&lvl->farr, &tfobjp, tsdbTFileObjCmpr, TD_EQ); + int32_t idx = TARRAY2_SEARCH_IDX(lvl->fobjArr, &tfobjp, tsdbTFileObjCmpr, TD_EQ); ASSERT(idx >= 0); - TARRAY2_REMOVE(&lvl->farr, idx, tsdbSttLvlRemoveFObj); + TARRAY2_REMOVE(lvl->fobjArr, idx, tsdbSttLvlRemoveFObj); - if (TARRAY2_SIZE(&lvl->farr) == 0) { + if (TARRAY2_SIZE(lvl->fobjArr) == 0) { // TODO: remove the stt level if no file exists anymore // TARRAY2_REMOVE(&fset->lvlArr, lvl - fset->lvlArr.data, tsdbSttLvlClear); } @@ -309,7 +309,7 @@ int32_t tsdbTFileSetEdit(STsdb *pTsdb, STFileSet *fset, const STFileOp *op) { ASSERT(lvl); STFileObj tfobj = {.f[0] = {.cid = op->of.cid}}, *tfobjp = &tfobj; - tfobjp = TARRAY2_SEARCH_EX(&lvl->farr, &tfobjp, tsdbTFileObjCmpr, TD_EQ); + tfobjp = TARRAY2_SEARCH_EX(lvl->fobjArr, &tfobjp, tsdbTFileObjCmpr, TD_EQ); ASSERT(tfobjp); @@ -356,22 +356,22 @@ int32_t tsdbTFileSetApplyEdit(STsdb *pTsdb, const STFileSet *fset1, STFileSet *f // stt part int32_t i1 = 0, i2 = 0; - while (i1 < TARRAY2_SIZE(&fset1->lvlArr) || i2 < TARRAY2_SIZE(&fset2->lvlArr)) { - SSttLvl *lvl1 = i1 < TARRAY2_SIZE(&fset1->lvlArr) ? TARRAY2_GET(&fset1->lvlArr, i1) : NULL; - SSttLvl *lvl2 = i2 < TARRAY2_SIZE(&fset2->lvlArr) ? TARRAY2_GET(&fset2->lvlArr, i2) : NULL; + while (i1 < TARRAY2_SIZE(fset1->lvlArr) || i2 < TARRAY2_SIZE(fset2->lvlArr)) { + SSttLvl *lvl1 = i1 < TARRAY2_SIZE(fset1->lvlArr) ? TARRAY2_GET(fset1->lvlArr, i1) : NULL; + SSttLvl *lvl2 = i2 < TARRAY2_SIZE(fset2->lvlArr) ? TARRAY2_GET(fset2->lvlArr, i2) : NULL; if (lvl1 && lvl2) { if (lvl1->level < lvl2->level) { // add a new stt level code = tsdbSttLvlInitEx(pTsdb, lvl1, &lvl2); if (code) return code; - code = TARRAY2_SORT_INSERT(&fset2->lvlArr, lvl2, tsdbSttLvlCmprFn); + code = TARRAY2_SORT_INSERT(fset2->lvlArr, lvl2, tsdbSttLvlCmprFn); if (code) return code; i1++; i2++; } else if (lvl1->level > lvl2->level) { // remove the stt level - TARRAY2_REMOVE(&fset2->lvlArr, i2, tsdbSttLvlRemove); + TARRAY2_REMOVE(fset2->lvlArr, i2, tsdbSttLvlRemove); } else { // apply edit on stt level code = tsdbSttLvlApplyEdit(pTsdb, lvl1, lvl2); @@ -383,13 +383,13 @@ int32_t tsdbTFileSetApplyEdit(STsdb *pTsdb, const STFileSet *fset1, STFileSet *f // add a new stt level code = tsdbSttLvlInitEx(pTsdb, lvl1, &lvl2); if (code) return code; - code = TARRAY2_SORT_INSERT(&fset2->lvlArr, lvl2, tsdbSttLvlCmprFn); + code = TARRAY2_SORT_INSERT(fset2->lvlArr, lvl2, tsdbSttLvlCmprFn); if (code) return code; i1++; i2++; } else { // remove the stt level - TARRAY2_REMOVE(&fset2->lvlArr, i2, tsdbSttLvlRemove); + TARRAY2_REMOVE(fset2->lvlArr, i2, tsdbSttLvlRemove); } } @@ -401,7 +401,7 @@ int32_t tsdbTFileSetInit(int32_t fid, STFileSet **fset) { if (fset[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY; fset[0]->fid = fid; - TARRAY2_INIT(&fset[0]->lvlArr); + TARRAY2_INIT(fset[0]->lvlArr); return 0; } @@ -420,7 +420,7 @@ int32_t tsdbTFileSetInitEx(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fse } const SSttLvl *lvl1; - TARRAY2_FOREACH(&fset1->lvlArr, lvl1) { + TARRAY2_FOREACH(fset1->lvlArr, lvl1) { SSttLvl *lvl; code = tsdbSttLvlInitEx(pTsdb, lvl1, &lvl); if (code) { @@ -428,7 +428,7 @@ int32_t tsdbTFileSetInitEx(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fse return code; } - code = TARRAY2_APPEND(&fset[0]->lvlArr, lvl); + code = TARRAY2_APPEND(fset[0]->lvlArr, lvl); if (code) return code; } @@ -443,7 +443,7 @@ int32_t tsdbTFileSetClear(STFileSet **fset) { tsdbTFileObjUnref(fset[0]->farr[ftype]); } - TARRAY2_CLEAR_FREE(&fset[0]->lvlArr, tsdbSttLvlClear); + TARRAY2_CLEAR_FREE(fset[0]->lvlArr, tsdbSttLvlClear); taosMemoryFree(fset[0]); fset[0] = NULL; @@ -457,7 +457,7 @@ int32_t tsdbTFileSetRemove(STFileSet **fset) { tsdbTFileObjRemove(fset[0]->farr[ftype]); } - TARRAY2_CLEAR_FREE(&fset[0]->lvlArr, tsdbSttLvlRemove); + TARRAY2_CLEAR_FREE(fset[0]->lvlArr, tsdbSttLvlRemove); taosMemoryFree(fset[0]); fset[0] = NULL; return 0; @@ -466,7 +466,7 @@ int32_t tsdbTFileSetRemove(STFileSet **fset) { SSttLvl *tsdbTFileSetGetLvl(STFileSet *fset, int32_t level) { SSttLvl tlvl = {.level = level}; SSttLvl *lvl = &tlvl; - return TARRAY2_SEARCH_EX(&fset->lvlArr, &lvl, tsdbSttLvlCmprFn, TD_EQ); + return TARRAY2_SEARCH_EX(fset->lvlArr, &lvl, tsdbSttLvlCmprFn, TD_EQ); } int32_t tsdbTFileSetCmprFn(const STFileSet **fset1, const STFileSet **fset2) { @@ -483,8 +483,8 @@ int64_t tsdbTFileSetMaxCid(const STFileSet *fset) { } const SSttLvl *lvl; const STFileObj *fobj; - TARRAY2_FOREACH(&fset->lvlArr, lvl) { - TARRAY2_FOREACH(&lvl->farr, fobj) { maxCid = TMAX(maxCid, fobj->f->cid); } + TARRAY2_FOREACH(fset->lvlArr, lvl) { + TARRAY2_FOREACH(lvl->fobjArr, fobj) { maxCid = TMAX(maxCid, fobj->f->cid); } } return maxCid; } @@ -493,5 +493,5 @@ bool tsdbTFileSetIsEmpty(const STFileSet *fset) { for (tsdb_ftype_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) { if (fset->farr[ftype] != NULL) return false; } - return TARRAY2_SIZE(&fset->lvlArr) == 0; + return TARRAY2_SIZE(fset->lvlArr) == 0; } \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c b/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c index 177af3ed274a24bcb6c4aa518aa2a50ca71e78be..d8c071a07353e722ace68d984910a1a3b6744866 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c @@ -16,40 +16,49 @@ #include "inc/tsdbMerge.h" typedef struct { - STsdb *tsdb; - int32_t maxRow; - int32_t minRow; - int32_t szPage; - int8_t cmprAlg; - int64_t compactVersion; - int64_t cid; - SSkmInfo skmTb; - SSkmInfo skmRow; - uint8_t *aBuf[5]; + STsdb *tsdb; + TFileSetArray fsetArr[1]; + int32_t sttTrigger; + int32_t maxRow; + int32_t minRow; + int32_t szPage; + int8_t cmprAlg; + int64_t compactVersion; + int64_t cid; + SSkmInfo skmTb; + SSkmInfo skmRow; + uint8_t *aBuf[5]; + // context struct { - bool opened; + bool opened; + + STFileSet *fset; bool toData; int32_t level; - STFileSet *fset; SRowInfo *row; SBlockData bData; } ctx[1]; + // reader - TARRAY2(SSttFileReader *) sttReaderArr[1]; - SDataFileReader *dataReader; - TTsdbIterArray iterArr[1]; - SIterMerger *iterMerger; + TSttFileReaderArray sttReaderArr[1]; + SDataFileReader *dataReader; + TTsdbIterArray iterArr[1]; + SIterMerger *iterMerger; + TFileOpArray fopArr[1]; // writer SSttFileWriter *sttWriter; SDataFileWriter *dataWriter; - // operations - TFileOpArray fopArr; } SMerger; static int32_t tsdbMergerOpen(SMerger *merger) { + merger->maxRow = merger->tsdb->pVnode->config.tsdbCfg.maxRows; + merger->minRow = merger->tsdb->pVnode->config.tsdbCfg.minRows; + merger->szPage = merger->tsdb->pVnode->config.tsdbPageSize; + merger->cmprAlg = merger->tsdb->pVnode->config.tsdbCfg.compression; + merger->compactVersion = INT64_MAX; + tsdbFSAllocEid(merger->tsdb->pFS, &merger->cid); merger->ctx->opened = true; - TARRAY2_INIT(&merger->fopArr); return 0; } @@ -62,14 +71,14 @@ static int32_t tsdbMergerClose(SMerger *merger) { STFileSystem *fs = merger->tsdb->pFS; // edit file system - code = tsdbFSEditBegin(fs, &merger->fopArr, TSDB_FEDIT_MERGE); + code = tsdbFSEditBegin(fs, merger->fopArr, TSDB_FEDIT_MERGE); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbFSEditCommit(fs); TSDB_CHECK_CODE(code, lino, _exit); // clear the merge - TARRAY2_FREE(&merger->fopArr); + TARRAY2_FREE(merger->fopArr); _exit: if (code) { @@ -182,18 +191,18 @@ static int32_t tsdbMergeFileSetBegin(SMerger *merger) { merger->ctx->toData = true; merger->ctx->level = 0; - TARRAY2_FOREACH(&fset->lvlArr, lvl) { + TARRAY2_FOREACH(fset->lvlArr, lvl) { if (lvl->level != merger->ctx->level) { lvl = NULL; break; } - fobj = TARRAY2_GET(&lvl->farr, 0); + fobj = TARRAY2_GET(lvl->fobjArr, 0); if (fobj->f->stt->nseg < merger->tsdb->pVnode->config.sttTrigger) { merger->ctx->toData = false; break; } else { - ASSERT(lvl->level == 0 || TARRAY2_SIZE(&lvl->farr) == 1); + ASSERT(lvl->level == 0 || TARRAY2_SIZE(lvl->fobjArr) == 1); merger->ctx->level++; // open the reader @@ -214,7 +223,7 @@ static int32_t tsdbMergeFileSetBegin(SMerger *merger) { .optype = TSDB_FOP_REMOVE, .of = fobj->f[0], }; - code = TARRAY2_APPEND(&merger->fopArr, op); + code = TARRAY2_APPEND(merger->fopArr, op); TSDB_CHECK_CODE(code, lino, _exit); } } @@ -282,7 +291,7 @@ static int32_t tsdbMergeFileSetEnd(SMerger *merger) { TSDB_CHECK_CODE(code, lino, _exit); if (op.optype != TSDB_FOP_NONE) { - code = TARRAY2_APPEND(&merger->fopArr, op); + code = TARRAY2_APPEND(merger->fopArr, op); TSDB_CHECK_CODE(code, lino, _exit); } @@ -301,7 +310,7 @@ static int32_t tsdbMergeFileSet(SMerger *merger, STFileSet *fset) { int32_t code = 0; int32_t lino = 0; - if (merger->ctx->opened == false) { + if (!merger->ctx->opened) { code = tsdbMergerOpen(merger); TSDB_CHECK_CODE(code, lino, _exit); } @@ -332,43 +341,59 @@ _exit: return 0; } -int32_t tsdbMerge(STsdb *tsdb) { +static int32_t tsdbDoMerge(SMerger *merger) { int32_t code = 0; - int32_t lino; - - int32_t vid = TD_VID(tsdb->pVnode); - STFileSystem *fs = tsdb->pFS; - STFileSet *fset; - STFileObj *fobj; - int32_t sttTrigger = tsdb->pVnode->config.sttTrigger; - - SMerger merger[1]; - merger->tsdb = tsdb; - merger->ctx->opened = false; - - // loop to merge each file set - TARRAY2_FOREACH(&fs->cstate, fset) { - SSttLvl *lvl0 = tsdbTFileSetGetLvl(fset, 0); - if (lvl0 == NULL) { - continue; - } + int32_t lino = 0; + int32_t vid = TD_VID(merger->tsdb->pVnode); - ASSERT(TARRAY2_SIZE(&lvl0->farr) > 0); + STFileSet *fset; + SSttLvl *lvl; + STFileObj *fobj; + TARRAY2_FOREACH(merger->fsetArr, fset) { + lvl = TARRAY2_SIZE(fset->lvlArr) ? TARRAY2_FIRST(fset->lvlArr) : NULL; + if (!lvl || lvl->level != 0 || TARRAY2_SIZE(lvl->fobjArr) == 0) continue; - fobj = TARRAY2_GET(&lvl0->farr, 0); + fobj = TARRAY2_FIRST(lvl->fobjArr); + if (fobj->f->stt->nseg < merger->sttTrigger) continue; - if (fobj->f->stt->nseg >= sttTrigger) { - code = tsdbMergeFileSet(merger, fset); - TSDB_CHECK_CODE(code, lino, _exit); - } + code = tsdbMergeFileSet(merger, fset); + TSDB_CHECK_CODE(code, lino, _exit); } - // end the merge if (merger->ctx->opened) { code = tsdbMergerClose(merger); TSDB_CHECK_CODE(code, lino, _exit); } +_exit: + if (code) { + TSDB_ERROR_LOG(vid, lino, code); + } else { + tsdbDebug("vgId:%d %s done", vid, __func__); + } + return code; +} + +int32_t tsdbMerge(STsdb *tsdb) { + int32_t code = 0; + int32_t lino = 0; + int32_t vid = TD_VID(tsdb->pVnode); + + SMerger merger[1] = {{ + .tsdb = tsdb, + .fsetArr = {TARRAY2_INITIALIZER}, + .sttTrigger = tsdb->pVnode->config.sttTrigger, + }}; + + code = tsdbFSCopySnapshot(tsdb->pFS, merger->fsetArr); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbDoMerge(merger); + TSDB_CHECK_CODE(code, lino, _exit); + + tsdbFSClearSnapshot(merger->fsetArr); + TARRAY2_FREE(merger->fsetArr); + _exit: if (code) { TSDB_ERROR_LOG(vid, lino, code);