diff --git a/include/util/tarray2.h b/include/util/tarray2.h index 56d8fb1c053ce9d4c98f0895120a922a28afc900..9ddf0143d3ca8231799252584725321d88c1a61d 100644 --- a/include/util/tarray2.h +++ b/include/util/tarray2.h @@ -46,9 +46,10 @@ typedef void (*TArray2Cb)(void *); #define TARRAY2_ELEM(a, i) ((a)->data[i]) #define TARRAY2_ELEM_PTR(a, i) (&((a)->data[i])) -static FORCE_INLINE int32_t tarray2_make_room(void *arg, // array - int32_t es, // expected size - int32_t sz // size of element +static FORCE_INLINE int32_t tarray2_make_room( // + void *arg, // array + int32_t es, // expected size + int32_t sz // size of element ) { TARRAY2(void) *a = arg; int32_t capacity = (a->capacity > 0) ? (a->capacity << 1) : TARRAY2_MIN_SIZE; diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbFS.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbFS.h index b31078494ae381453339c64a0b707af297a71cb0..729d5a90c65bf46d0049a814d42413505438edeb 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbFS.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbFS.h @@ -37,7 +37,7 @@ int32_t tsdbOpenFS(STsdb *pTsdb, STFileSystem **fs, int8_t rollback); int32_t tsdbCloseFS(STFileSystem **fs); // txn int32_t tsdbFSAllocEid(STFileSystem *pFS, int64_t *eid); -int32_t tsdbFSEditBegin(STFileSystem *fs, const SArray *aFileOp, EFEditT etype); +int32_t tsdbFSEditBegin(STFileSystem *fs, const TFileOpArray *opArray, EFEditT etype); int32_t tsdbFSEditCommit(STFileSystem *pFS); int32_t tsdbFSEditAbort(STFileSystem *pFS); // other diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbFSet.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbFSet.h index 96fa2eb0143f49e740695183a727d87eb9eb77d7..3cee85cdf50e09773b3bdf9104bd50f96843b56d 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbFSet.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbFSet.h @@ -47,12 +47,12 @@ int32_t tsdbJsonToTFileSet(STsdb *pTsdb, const cJSON *json, STFileSet **fset); // cmpr int32_t tsdbTFileSetCmprFn(const STFileSet **fset1, const STFileSet **fset2); // edit -int32_t tsdbTFileSetEdit(STFileSet *fset, const STFileOp *op); +int32_t tsdbTFileSetEdit(STsdb *pTsdb, STFileSet *fset, const STFileOp *op); int32_t tsdbTFileSetEditEx(const STFileSet *fset1, STFileSet *fset); // max commit id int64_t tsdbTFileSetMaxCid(const STFileSet *fset); -const SSttLvl *tsdbTFileSetGetLvl(const STFileSet *fset, int32_t level); +SSttLvl *tsdbTFileSetGetLvl(const STFileSet *fset, int32_t level); struct STFileOp { tsdb_fop_t op; diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbFile.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbFile.h index 1027e5be6bf4961bccb9bf8b3f1a5134d287b565..24eb31855c1fd8ff5dc7c48d46fc99bbd346faf5 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbFile.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbFile.h @@ -51,6 +51,7 @@ int32_t tsdbTFileObjInit(STsdb *pTsdb, const STFile *f, STFileObj **fobj); int32_t tsdbTFileObjRef(STFileObj *fobj); int32_t tsdbTFileObjUnref(STFileObj *fobj); int32_t tsdbTFileRemove(STFileObj *fobj); +int32_t tsdbTFileObjCmpr(const STFileObj **fobj1, const STFileObj **fobj2); struct STFile { tsdb_ftype_t type; diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c b/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c index 869dd78a249662c826d289aec9f33f08f441161b..ba334d9cb3f6efdbd17d071ed6baabf59dbda5fd 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c @@ -72,7 +72,6 @@ static int32_t open_writer_with_new_stt(SCommitter *pCommitter) { config.file.size = 0; config.file.stt.level = 0; config.file.stt.nseg = 0; - // tsdbTFileInit(pTsdb, &config.file); code = tsdbSttFWriterOpen(&config, &pCommitter->pWriter); TSDB_CHECK_CODE(code, lino, _exit); @@ -275,15 +274,12 @@ static int32_t commit_fset_end(SCommitter *pCommitter) { if (pCommitter->pWriter == NULL) return 0; - // TODO - // struct STFileOp *pFileOp = taosArrayReserve(pCommitter->aFileOp, 1); - // if (pFileOp == NULL) { - // code = TSDB_CODE_OUT_OF_MEMORY; - // TSDB_CHECK_CODE(code, lino, _exit); - // } + STFileOp op; + code = tsdbSttFWriterClose(&pCommitter->pWriter, 0, &op); + TSDB_CHECK_CODE(code, lino, _exit); - // code = tsdbSttFWriterClose(&pCommitter->pWriter, 0, pFileOp); - // TSDB_CHECK_CODE(code, lino, _exit); + code = TARRAY2_APPEND(&pCommitter->opArray, op); + TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { @@ -363,7 +359,7 @@ static int32_t close_committer(SCommitter *pCommiter, int32_t eno) { int32_t vid = TD_VID(pCommiter->pTsdb->pVnode); if (eno == 0) { - code = tsdbFSEditBegin(pCommiter->pTsdb->pFS, NULL /* TODO */, TSDB_FEDIT_COMMIT); + code = tsdbFSEditBegin(pCommiter->pTsdb->pFS, &pCommiter->opArray, TSDB_FEDIT_COMMIT); TSDB_CHECK_CODE(code, lino, _exit); } else { // TODO diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbFS.c b/source/dnode/vnode/src/tsdb/dev/tsdbFS.c index 823c749cec38bdee2e4a924e2100b05ad6947682..f81f7d6eec8774d83b1124844168b858d4fdedce 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbFS.c @@ -241,29 +241,26 @@ static bool is_same_file(const STFile *f1, const STFile f2) { } static int32_t apply_commit(STFileSystem *fs) { - int32_t code = 0; - int32_t i1 = 0, i2 = 0; - int32_t n1 = TARRAY2_SIZE(&fs->cstate); - int32_t n2 = TARRAY2_SIZE(&fs->nstate); + int32_t code = 0; + TFileSetArray *fsetArray1 = &fs->cstate; + TFileSetArray *fsetArray2 = &fs->nstate; + int32_t i1 = 0, i2 = 0; - while (i1 < n1 || i2 < n2) { - STFileSet *fset1 = i1 < n1 ? TARRAY2_ELEM(&fs->cstate, i1) : NULL; - STFileSet *fset2 = i2 < n2 ? TARRAY2_ELEM(&fs->nstate, i2) : NULL; + while (i1 < TARRAY2_SIZE(fsetArray1) || i2 < TARRAY2_SIZE(fsetArray2)) { + STFileSet *fset1 = i1 < TARRAY2_SIZE(fsetArray1) ? TARRAY2_ELEM(fsetArray1, i1) : NULL; + STFileSet *fset2 = i2 < TARRAY2_SIZE(fsetArray2) ? TARRAY2_ELEM(fsetArray2, i2) : NULL; if (fset1 && fset2) { if (fset1->fid < fset2->fid) { - // delete fset1 - TARRAY2_REMOVE(&fs->cstate, i1, tsdbTFileSetClear); - n1 = TARRAY2_SIZE(&fs->cstate); + // delete fset1 (TODO: should set file remove) + TARRAY2_REMOVE(fsetArray1, i1, tsdbTFileSetClear); } else if (fset1->fid > fset2->fid) { // create new file set with fid of fset2->fid code = tsdbTFileSetInitEx(fs->pTsdb, fset2, &fset1); if (code) return code; - code = TARRAY2_SORT_INSERT(&fs->cstate, fset1, tsdbTFileSetCmprFn); + code = TARRAY2_SORT_INSERT(fsetArray1, fset1, tsdbTFileSetCmprFn); if (code) return code; i1++; - i2++; - n1 = TARRAY2_SIZE(&fs->cstate); } else { // edit code = tsdbTFileSetEditEx(fset2, fset1); @@ -272,18 +269,15 @@ static int32_t apply_commit(STFileSystem *fs) { i2++; } } else if (fset1) { - // delete fset1 - TARRAY2_REMOVE(&fs->cstate, i1, tsdbTFileSetClear); - n1 = TARRAY2_SIZE(&fs->cstate); + // delete fset1 (TODO: should set file remove) + TARRAY2_REMOVE(fsetArray1, i1, tsdbTFileSetClear); } else { // create new file set with fid of fset2->fid code = tsdbTFileSetInitEx(fs->pTsdb, fset2, &fset1); if (code) return code; - code = TARRAY2_SORT_INSERT(&fs->cstate, fset1, tsdbTFileSetCmprFn); + code = TARRAY2_SORT_INSERT(fsetArray1, fset1, tsdbTFileSetCmprFn); if (code) return code; i1++; - i2++; - n1 = TARRAY2_SIZE(&fs->cstate); } } @@ -474,33 +468,34 @@ static int32_t fset_cmpr_fn(const struct STFileSet *pSet1, const struct STFileSe return 0; } -static int32_t edit_fs(TFileSetArray *fset_arr, const TFileOpArray *op_arr) { - int32_t code = 0; - int32_t lino = 0; +static int32_t edit_fs(STFileSystem *fs, const TFileOpArray *opArray) { + int32_t code = 0; + int32_t lino = 0; + TFileSetArray *fsetArray = &fs->nstate; STFileSet *fset = NULL; const STFileOp *op; - TARRAY2_FOREACH_PTR(op_arr, op) { + TARRAY2_FOREACH_PTR(opArray, op) { if (!fset || fset->fid != op->fid) { STFileSet tfset = {.fid = op->fid}; fset = &tfset; - fset = TARRAY2_SEARCH(fset_arr, &fset, tsdbTFileSetCmprFn, TD_EQ); + fset = TARRAY2_SEARCH(fsetArray, &fset, tsdbTFileSetCmprFn, TD_EQ); if (!fset) { code = tsdbTFileSetInit(op->fid, &fset); TSDB_CHECK_CODE(code, lino, _exit); - code = TARRAY2_SORT_INSERT(fset_arr, fset, tsdbTFileSetCmprFn); + code = TARRAY2_SORT_INSERT(fsetArray, fset, tsdbTFileSetCmprFn); TSDB_CHECK_CODE(code, lino, _exit); } } - code = tsdbTFileSetEdit(fset, op); + code = tsdbTFileSetEdit(fs->pTsdb, fset, op); TSDB_CHECK_CODE(code, lino, _exit); + } - if (0) { - // TODO check if the file set should be deleted - } + { + // TODO: check if a file set should be deleted } _exit: @@ -539,7 +534,7 @@ int32_t tsdbFSAllocEid(STFileSystem *pFS, int64_t *eid) { return 0; } -int32_t tsdbFSEditBegin(STFileSystem *fs, const SArray *aFileOp, EFEditT etype) { +int32_t tsdbFSEditBegin(STFileSystem *fs, const TFileOpArray *opArray, EFEditT etype) { int32_t code = 0; int32_t lino; char current_t[TSDB_FILENAME_LEN]; @@ -560,7 +555,7 @@ int32_t tsdbFSEditBegin(STFileSystem *fs, const SArray *aFileOp, EFEditT etype) fs->etype = etype; // edit - code = edit_fs(&fs->nstate, NULL /* TODO */); + code = edit_fs(fs, opArray); TSDB_CHECK_CODE(code, lino, _exit); // save fs diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbFSet.c b/source/dnode/vnode/src/tsdb/dev/tsdbFSet.c index 532071d7b0d559228824423fadebff85407da9e3..acedef0e8df33c9ab3e7cc806fee5d744e3906ce 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbFSet.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbFSet.c @@ -46,9 +46,9 @@ static int32_t tsdbSttLvlInitEx(STsdb *pTsdb, const SSttLvl *lvl1, SSttLvl **lvl return 0; } -static int32_t tsdbSttLvlCmprFn(const SSttLvl *lvl1, const SSttLvl *lvl2) { - if (lvl1->level < lvl2->level) return -1; - if (lvl1->level > lvl2->level) return 1; +static int32_t tsdbSttLvlCmprFn(const SSttLvl **lvl1, const SSttLvl **lvl2) { + if (lvl1[0]->level < lvl2[0]->level) return -1; + if (lvl1[0]->level > lvl2[0]->level) return 1; return 0; } @@ -194,30 +194,45 @@ int32_t tsdbJsonToTFileSet(STsdb *pTsdb, const cJSON *json, STFileSet **fset) { return 0; } -int32_t tsdbTFileSetEdit(STFileSet *fset, const STFileOp *op) { +int32_t tsdbTFileSetEdit(STsdb *pTsdb, STFileSet *fset, const STFileOp *op) { int32_t code = 0; - // if (op->oState.size == 0 // - // || 0 /* TODO*/ - // ) { - // STFileObj *fobj; - // // code = tsdbTFileObjCreate(&fobj); - // if (code) return code; - // fobj->f = op->nState; - // add_file_to_fset(fset, fobj); - // } else if (op->nState.size == 0) { - // // delete - // ASSERT(0); - // } else { - // // modify - // ASSERT(0); - // } + if (op->oState.size == 0 // + || 0 /* TODO*/ + ) { + STFileObj *fobj; + code = tsdbTFileObjInit(pTsdb, &op->nState, &fobj); + if (code) return code; + + if (fobj->f.type == TSDB_FTYPE_STT) { + SSttLvl *lvl = tsdbTFileSetGetLvl(fset, fobj->f.stt.level); + if (!lvl) { + code = tsdbSttLvlInit(fobj->f.stt.level, &lvl); + if (code) return code; + + code = TARRAY2_SORT_INSERT(&fset->lvlArr, lvl, tsdbSttLvlCmprFn); + if (code) return code; + } + + code = TARRAY2_SORT_INSERT(&lvl->farr, fobj, tsdbTFileObjCmpr); + if (code) return code; + } else { + fset->farr[fobj->f.type] = fobj; + } + } else if (op->nState.size == 0) { + // delete + ASSERT(0); + } else { + // modify + ASSERT(0); + } + return 0; } int32_t tsdbTFileSetEditEx(const STFileSet *fset1, STFileSet *fset) { ASSERT(fset1->fid == fset->fid); - // TODO + ASSERT(0); return 0; } @@ -253,7 +268,8 @@ int32_t tsdbTFileSetInitEx(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fse return code; } - TARRAY2_APPEND(&fset[0]->lvlArr, lvl); + code = TARRAY2_APPEND(&fset[0]->lvlArr, lvl); + if (code) return code; } return 0; @@ -275,9 +291,9 @@ int32_t tsdbTFileSetClear(STFileSet **fset) { return 0; } -const SSttLvl *tsdbTFileSetGetLvl(const STFileSet *fset, int32_t level) { - SSttLvl tlvl = {.level = level}; - const SSttLvl *lvl = &tlvl; +SSttLvl *tsdbTFileSetGetLvl(const STFileSet *fset, int32_t level) { + SSttLvl tlvl = {.level = level}; + SSttLvl *lvl = &tlvl; return TARRAY2_SEARCH(&fset->lvlArr, &lvl, tsdbSttLvlCmprFn, TD_EQ); } diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbFile.c b/source/dnode/vnode/src/tsdb/dev/tsdbFile.c index 44d656c135b7acc3b29232d242f987623dcf0442..83f332dbe19c287a8f1a849d4c801b177eaabce1 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbFile.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbFile.c @@ -274,4 +274,14 @@ int32_t tsdbTFileName(STsdb *pTsdb, const STFile *f, char fname[]) { g_tfile_info[f->type].suffix); } return 0; +} + +int32_t tsdbTFileObjCmpr(const STFileObj **fobj1, const STFileObj **fobj2) { + if (fobj1[0]->f.cid < fobj2[0]->f.cid) { + return -1; + } else if (fobj1[0]->f.cid > fobj2[0]->f.cid) { + return 1; + } else { + return 0; + } } \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbSttFReaderWriter.c b/source/dnode/vnode/src/tsdb/dev/tsdbSttFReaderWriter.c index b30b17b80ad57d6d6ce3fa425f3a7666b711614d..ddd44f38bc51be4e18bf2bbd0d0b4bd59e65ca67 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbSttFReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbSttFReaderWriter.c @@ -472,6 +472,7 @@ static int32_t open_stt_fwriter(SSttFileWriter *pWriter) { int32_t code = 0; int32_t lino = 0; int32_t vid = TD_VID(pWriter->config.pTsdb->pVnode); + char fname[TSDB_FILENAME_LEN]; uint8_t hdr[TSDB_FHDR_SIZE] = {0}; int32_t flag = TD_FILE_READ | TD_FILE_WRITE; @@ -479,7 +480,8 @@ static int32_t open_stt_fwriter(SSttFileWriter *pWriter) { flag |= (TD_FILE_CREATE | TD_FILE_TRUNC); } - code = tsdbOpenFile(NULL /*pWriter->config.file.fname*/, pWriter->config.szPage, flag, &pWriter->pFd); + tsdbTFileName(pWriter->config.pTsdb, &pWriter->config.file, fname); + code = tsdbOpenFile(fname, pWriter->config.szPage, flag, &pWriter->pFd); TSDB_CHECK_CODE(code, lino, _exit); if (pWriter->tFile.size == 0) {