提交 c16126fa 编写于 作者: H Hongze Cheng

more code

上级 d56fded4
......@@ -46,7 +46,8 @@ typedef void (*TArray2Cb)(void *);
#define TARRAY2_ELEM(a, i) ((a)->data[i])
#define TARRAY2_ELEM_PTR(a, i) (&((a)->data[i]))
static FORCE_INLINE int32_t tarray2_make_room(void *arg, // array
static FORCE_INLINE int32_t tarray2_make_room( //
void *arg, // array
int32_t es, // expected size
int32_t sz // size of element
) {
......
......@@ -37,7 +37,7 @@ int32_t tsdbOpenFS(STsdb *pTsdb, STFileSystem **fs, int8_t rollback);
int32_t tsdbCloseFS(STFileSystem **fs);
// txn
int32_t tsdbFSAllocEid(STFileSystem *pFS, int64_t *eid);
int32_t tsdbFSEditBegin(STFileSystem *fs, const SArray *aFileOp, EFEditT etype);
int32_t tsdbFSEditBegin(STFileSystem *fs, const TFileOpArray *opArray, EFEditT etype);
int32_t tsdbFSEditCommit(STFileSystem *pFS);
int32_t tsdbFSEditAbort(STFileSystem *pFS);
// other
......
......@@ -47,12 +47,12 @@ int32_t tsdbJsonToTFileSet(STsdb *pTsdb, const cJSON *json, STFileSet **fset);
// cmpr
int32_t tsdbTFileSetCmprFn(const STFileSet **fset1, const STFileSet **fset2);
// edit
int32_t tsdbTFileSetEdit(STFileSet *fset, const STFileOp *op);
int32_t tsdbTFileSetEdit(STsdb *pTsdb, STFileSet *fset, const STFileOp *op);
int32_t tsdbTFileSetEditEx(const STFileSet *fset1, STFileSet *fset);
// max commit id
int64_t tsdbTFileSetMaxCid(const STFileSet *fset);
const SSttLvl *tsdbTFileSetGetLvl(const STFileSet *fset, int32_t level);
SSttLvl *tsdbTFileSetGetLvl(const STFileSet *fset, int32_t level);
struct STFileOp {
tsdb_fop_t op;
......
......@@ -51,6 +51,7 @@ int32_t tsdbTFileObjInit(STsdb *pTsdb, const STFile *f, STFileObj **fobj);
int32_t tsdbTFileObjRef(STFileObj *fobj);
int32_t tsdbTFileObjUnref(STFileObj *fobj);
int32_t tsdbTFileRemove(STFileObj *fobj);
int32_t tsdbTFileObjCmpr(const STFileObj **fobj1, const STFileObj **fobj2);
struct STFile {
tsdb_ftype_t type;
......
......@@ -72,7 +72,6 @@ static int32_t open_writer_with_new_stt(SCommitter *pCommitter) {
config.file.size = 0;
config.file.stt.level = 0;
config.file.stt.nseg = 0;
// tsdbTFileInit(pTsdb, &config.file);
code = tsdbSttFWriterOpen(&config, &pCommitter->pWriter);
TSDB_CHECK_CODE(code, lino, _exit);
......@@ -275,15 +274,12 @@ static int32_t commit_fset_end(SCommitter *pCommitter) {
if (pCommitter->pWriter == NULL) return 0;
// TODO
// struct STFileOp *pFileOp = taosArrayReserve(pCommitter->aFileOp, 1);
// if (pFileOp == NULL) {
// code = TSDB_CODE_OUT_OF_MEMORY;
// TSDB_CHECK_CODE(code, lino, _exit);
// }
STFileOp op;
code = tsdbSttFWriterClose(&pCommitter->pWriter, 0, &op);
TSDB_CHECK_CODE(code, lino, _exit);
// code = tsdbSttFWriterClose(&pCommitter->pWriter, 0, pFileOp);
// TSDB_CHECK_CODE(code, lino, _exit);
code = TARRAY2_APPEND(&pCommitter->opArray, op);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
......@@ -363,7 +359,7 @@ static int32_t close_committer(SCommitter *pCommiter, int32_t eno) {
int32_t vid = TD_VID(pCommiter->pTsdb->pVnode);
if (eno == 0) {
code = tsdbFSEditBegin(pCommiter->pTsdb->pFS, NULL /* TODO */, TSDB_FEDIT_COMMIT);
code = tsdbFSEditBegin(pCommiter->pTsdb->pFS, &pCommiter->opArray, TSDB_FEDIT_COMMIT);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
// TODO
......
......@@ -242,28 +242,25 @@ static bool is_same_file(const STFile *f1, const STFile f2) {
static int32_t apply_commit(STFileSystem *fs) {
int32_t code = 0;
TFileSetArray *fsetArray1 = &fs->cstate;
TFileSetArray *fsetArray2 = &fs->nstate;
int32_t i1 = 0, i2 = 0;
int32_t n1 = TARRAY2_SIZE(&fs->cstate);
int32_t n2 = TARRAY2_SIZE(&fs->nstate);
while (i1 < n1 || i2 < n2) {
STFileSet *fset1 = i1 < n1 ? TARRAY2_ELEM(&fs->cstate, i1) : NULL;
STFileSet *fset2 = i2 < n2 ? TARRAY2_ELEM(&fs->nstate, i2) : NULL;
while (i1 < TARRAY2_SIZE(fsetArray1) || i2 < TARRAY2_SIZE(fsetArray2)) {
STFileSet *fset1 = i1 < TARRAY2_SIZE(fsetArray1) ? TARRAY2_ELEM(fsetArray1, i1) : NULL;
STFileSet *fset2 = i2 < TARRAY2_SIZE(fsetArray2) ? TARRAY2_ELEM(fsetArray2, i2) : NULL;
if (fset1 && fset2) {
if (fset1->fid < fset2->fid) {
// delete fset1
TARRAY2_REMOVE(&fs->cstate, i1, tsdbTFileSetClear);
n1 = TARRAY2_SIZE(&fs->cstate);
// delete fset1 (TODO: should set file remove)
TARRAY2_REMOVE(fsetArray1, i1, tsdbTFileSetClear);
} else if (fset1->fid > fset2->fid) {
// create new file set with fid of fset2->fid
code = tsdbTFileSetInitEx(fs->pTsdb, fset2, &fset1);
if (code) return code;
code = TARRAY2_SORT_INSERT(&fs->cstate, fset1, tsdbTFileSetCmprFn);
code = TARRAY2_SORT_INSERT(fsetArray1, fset1, tsdbTFileSetCmprFn);
if (code) return code;
i1++;
i2++;
n1 = TARRAY2_SIZE(&fs->cstate);
} else {
// edit
code = tsdbTFileSetEditEx(fset2, fset1);
......@@ -272,18 +269,15 @@ static int32_t apply_commit(STFileSystem *fs) {
i2++;
}
} else if (fset1) {
// delete fset1
TARRAY2_REMOVE(&fs->cstate, i1, tsdbTFileSetClear);
n1 = TARRAY2_SIZE(&fs->cstate);
// delete fset1 (TODO: should set file remove)
TARRAY2_REMOVE(fsetArray1, i1, tsdbTFileSetClear);
} else {
// create new file set with fid of fset2->fid
code = tsdbTFileSetInitEx(fs->pTsdb, fset2, &fset1);
if (code) return code;
code = TARRAY2_SORT_INSERT(&fs->cstate, fset1, tsdbTFileSetCmprFn);
code = TARRAY2_SORT_INSERT(fsetArray1, fset1, tsdbTFileSetCmprFn);
if (code) return code;
i1++;
i2++;
n1 = TARRAY2_SIZE(&fs->cstate);
}
}
......@@ -474,33 +468,34 @@ static int32_t fset_cmpr_fn(const struct STFileSet *pSet1, const struct STFileSe
return 0;
}
static int32_t edit_fs(TFileSetArray *fset_arr, const TFileOpArray *op_arr) {
static int32_t edit_fs(STFileSystem *fs, const TFileOpArray *opArray) {
int32_t code = 0;
int32_t lino = 0;
TFileSetArray *fsetArray = &fs->nstate;
STFileSet *fset = NULL;
const STFileOp *op;
TARRAY2_FOREACH_PTR(op_arr, op) {
TARRAY2_FOREACH_PTR(opArray, op) {
if (!fset || fset->fid != op->fid) {
STFileSet tfset = {.fid = op->fid};
fset = &tfset;
fset = TARRAY2_SEARCH(fset_arr, &fset, tsdbTFileSetCmprFn, TD_EQ);
fset = TARRAY2_SEARCH(fsetArray, &fset, tsdbTFileSetCmprFn, TD_EQ);
if (!fset) {
code = tsdbTFileSetInit(op->fid, &fset);
TSDB_CHECK_CODE(code, lino, _exit);
code = TARRAY2_SORT_INSERT(fset_arr, fset, tsdbTFileSetCmprFn);
code = TARRAY2_SORT_INSERT(fsetArray, fset, tsdbTFileSetCmprFn);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
code = tsdbTFileSetEdit(fset, op);
code = tsdbTFileSetEdit(fs->pTsdb, fset, op);
TSDB_CHECK_CODE(code, lino, _exit);
if (0) {
// TODO check if the file set should be deleted
}
{
// TODO: check if a file set should be deleted
}
_exit:
......@@ -539,7 +534,7 @@ int32_t tsdbFSAllocEid(STFileSystem *pFS, int64_t *eid) {
return 0;
}
int32_t tsdbFSEditBegin(STFileSystem *fs, const SArray *aFileOp, EFEditT etype) {
int32_t tsdbFSEditBegin(STFileSystem *fs, const TFileOpArray *opArray, EFEditT etype) {
int32_t code = 0;
int32_t lino;
char current_t[TSDB_FILENAME_LEN];
......@@ -560,7 +555,7 @@ int32_t tsdbFSEditBegin(STFileSystem *fs, const SArray *aFileOp, EFEditT etype)
fs->etype = etype;
// edit
code = edit_fs(&fs->nstate, NULL /* TODO */);
code = edit_fs(fs, opArray);
TSDB_CHECK_CODE(code, lino, _exit);
// save fs
......
......@@ -46,9 +46,9 @@ static int32_t tsdbSttLvlInitEx(STsdb *pTsdb, const SSttLvl *lvl1, SSttLvl **lvl
return 0;
}
static int32_t tsdbSttLvlCmprFn(const SSttLvl *lvl1, const SSttLvl *lvl2) {
if (lvl1->level < lvl2->level) return -1;
if (lvl1->level > lvl2->level) return 1;
static int32_t tsdbSttLvlCmprFn(const SSttLvl **lvl1, const SSttLvl **lvl2) {
if (lvl1[0]->level < lvl2[0]->level) return -1;
if (lvl1[0]->level > lvl2[0]->level) return 1;
return 0;
}
......@@ -194,30 +194,45 @@ int32_t tsdbJsonToTFileSet(STsdb *pTsdb, const cJSON *json, STFileSet **fset) {
return 0;
}
int32_t tsdbTFileSetEdit(STFileSet *fset, const STFileOp *op) {
int32_t tsdbTFileSetEdit(STsdb *pTsdb, STFileSet *fset, const STFileOp *op) {
int32_t code = 0;
// if (op->oState.size == 0 //
// || 0 /* TODO*/
// ) {
// STFileObj *fobj;
// // code = tsdbTFileObjCreate(&fobj);
// if (code) return code;
// fobj->f = op->nState;
// add_file_to_fset(fset, fobj);
// } else if (op->nState.size == 0) {
// // delete
// ASSERT(0);
// } else {
// // modify
// ASSERT(0);
// }
if (op->oState.size == 0 //
|| 0 /* TODO*/
) {
STFileObj *fobj;
code = tsdbTFileObjInit(pTsdb, &op->nState, &fobj);
if (code) return code;
if (fobj->f.type == TSDB_FTYPE_STT) {
SSttLvl *lvl = tsdbTFileSetGetLvl(fset, fobj->f.stt.level);
if (!lvl) {
code = tsdbSttLvlInit(fobj->f.stt.level, &lvl);
if (code) return code;
code = TARRAY2_SORT_INSERT(&fset->lvlArr, lvl, tsdbSttLvlCmprFn);
if (code) return code;
}
code = TARRAY2_SORT_INSERT(&lvl->farr, fobj, tsdbTFileObjCmpr);
if (code) return code;
} else {
fset->farr[fobj->f.type] = fobj;
}
} else if (op->nState.size == 0) {
// delete
ASSERT(0);
} else {
// modify
ASSERT(0);
}
return 0;
}
int32_t tsdbTFileSetEditEx(const STFileSet *fset1, STFileSet *fset) {
ASSERT(fset1->fid == fset->fid);
// TODO
ASSERT(0);
return 0;
}
......@@ -253,7 +268,8 @@ int32_t tsdbTFileSetInitEx(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fse
return code;
}
TARRAY2_APPEND(&fset[0]->lvlArr, lvl);
code = TARRAY2_APPEND(&fset[0]->lvlArr, lvl);
if (code) return code;
}
return 0;
......@@ -275,9 +291,9 @@ int32_t tsdbTFileSetClear(STFileSet **fset) {
return 0;
}
const SSttLvl *tsdbTFileSetGetLvl(const STFileSet *fset, int32_t level) {
SSttLvl *tsdbTFileSetGetLvl(const STFileSet *fset, int32_t level) {
SSttLvl tlvl = {.level = level};
const SSttLvl *lvl = &tlvl;
SSttLvl *lvl = &tlvl;
return TARRAY2_SEARCH(&fset->lvlArr, &lvl, tsdbSttLvlCmprFn, TD_EQ);
}
......
......@@ -275,3 +275,13 @@ int32_t tsdbTFileName(STsdb *pTsdb, const STFile *f, char fname[]) {
}
return 0;
}
int32_t tsdbTFileObjCmpr(const STFileObj **fobj1, const STFileObj **fobj2) {
if (fobj1[0]->f.cid < fobj2[0]->f.cid) {
return -1;
} else if (fobj1[0]->f.cid > fobj2[0]->f.cid) {
return 1;
} else {
return 0;
}
}
\ No newline at end of file
......@@ -472,6 +472,7 @@ static int32_t open_stt_fwriter(SSttFileWriter *pWriter) {
int32_t code = 0;
int32_t lino = 0;
int32_t vid = TD_VID(pWriter->config.pTsdb->pVnode);
char fname[TSDB_FILENAME_LEN];
uint8_t hdr[TSDB_FHDR_SIZE] = {0};
int32_t flag = TD_FILE_READ | TD_FILE_WRITE;
......@@ -479,7 +480,8 @@ static int32_t open_stt_fwriter(SSttFileWriter *pWriter) {
flag |= (TD_FILE_CREATE | TD_FILE_TRUNC);
}
code = tsdbOpenFile(NULL /*pWriter->config.file.fname*/, pWriter->config.szPage, flag, &pWriter->pFd);
tsdbTFileName(pWriter->config.pTsdb, &pWriter->config.file, fname);
code = tsdbOpenFile(fname, pWriter->config.szPage, flag, &pWriter->pFd);
TSDB_CHECK_CODE(code, lino, _exit);
if (pWriter->tFile.size == 0) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册