提交 36b2495c 编写于 作者: H Hongze Cheng

more code

上级 5d5add3f
......@@ -29,14 +29,6 @@ typedef TARRAY2(STFileObj *) TFileObjArray;
typedef TARRAY2(SSttLvl *) TSttLvlArray;
typedef TARRAY2(STFileOp) TFileOpArray;
typedef enum {
TSDB_FOP_NONE = 0,
TSDB_FOP_EXTEND,
TSDB_FOP_CREATE,
TSDB_FOP_DELETE,
TSDB_FOP_TRUNCATE,
} tsdb_fop_t;
// init/clear
int32_t tsdbTFileSetInit(int32_t fid, STFileSet **fset);
int32_t tsdbTFileSetInitEx(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fset);
......@@ -52,14 +44,16 @@ int32_t tsdbTFileSetEdit(STsdb *pTsdb, STFileSet *fset, const STFileOp *op);
int32_t tsdbTFileSetApplyEdit(STsdb *pTsdb, const STFileSet *fset1, STFileSet *fset);
// max commit id
int64_t tsdbTFileSetMaxCid(const STFileSet *fset);
// get
SSttLvl *tsdbTFileSetGetLvl(STFileSet *fset, int32_t level);
// is empty
bool tsdbTFileSetIsEmpty(const STFileSet *fset);
struct STFileOp {
tsdb_fop_t op;
int32_t fid;
STFile oState; // old file state
STFile nState; // new file state
int32_t fid;
STFile *of; // old file
STFile *nf; // new file
STFile fArr[2];
};
struct SSttLvl {
......
......@@ -326,7 +326,7 @@ static int32_t open_committer(STsdb *pTsdb, SCommitInfo *pInfo, SCommitter *pCom
pCommitter->minRow = pInfo->info.config.tsdbCfg.minRows;
pCommitter->maxRow = pInfo->info.config.tsdbCfg.maxRows;
pCommitter->cmprAlg = pInfo->info.config.tsdbCfg.compression;
pCommitter->sttTrigger = 1; // TODO
pCommitter->sttTrigger = 4; // TODO
pCommitter->aTbDataP = tsdbMemTableGetTbDataArray(pTsdb->imem);
if (pCommitter->aTbDataP == NULL) {
......
......@@ -496,8 +496,15 @@ static int32_t edit_fs(STFileSystem *fs, const TFileOpArray *opArray) {
TSDB_CHECK_CODE(code, lino, _exit);
}
{
// TODO: check if a file set should be deleted
// remove empty file set
int32_t i = 0;
while (i < TARRAY2_SIZE(fsetArray)) {
fset = TARRAY2_GET(fsetArray, i);
if (tsdbTFileSetIsEmpty(fset)) {
TARRAY2_REMOVE(fsetArray, i, tsdbTFileSetClear);
} else {
i++;
}
}
_exit:
......
......@@ -259,35 +259,35 @@ int32_t tsdbJsonToTFileSet(STsdb *pTsdb, const cJSON *json, STFileSet **fset) {
int32_t tsdbTFileSetEdit(STsdb *pTsdb, STFileSet *fset, const STFileOp *op) {
int32_t code = 0;
if (op->oState.size == 0 //
|| 0 /* TODO*/
) {
STFileObj *fobj;
code = tsdbTFileObjInit(pTsdb, &op->nState, &fobj);
if (code) return code;
if (fobj->f.type == TSDB_FTYPE_STT) {
SSttLvl *lvl = tsdbTFileSetGetLvl(fset, fobj->f.stt.level);
if (!lvl) {
code = tsdbSttLvlInit(fobj->f.stt.level, &lvl);
if (code) return code;
code = TARRAY2_SORT_INSERT(&fset->lvlArr, lvl, tsdbSttLvlCmprFn);
if (code) return code;
}
code = TARRAY2_SORT_INSERT(&lvl->farr, fobj, tsdbTFileObjCmpr);
if (code) return code;
} else {
fset->farr[fobj->f.type] = fobj;
}
} else if (op->nState.size == 0) {
// delete
ASSERT(0);
} else {
// modify
ASSERT(0);
}
// if (op->of.size == 0 //
// || 0 /* TODO*/
// ) {
// STFileObj *fobj;
// code = tsdbTFileObjInit(pTsdb, &op->nf, &fobj);
// if (code) return code;
// if (fobj->f.type == TSDB_FTYPE_STT) {
// SSttLvl *lvl = tsdbTFileSetGetLvl(fset, fobj->f.stt.level);
// if (!lvl) {
// code = tsdbSttLvlInit(fobj->f.stt.level, &lvl);
// if (code) return code;
// code = TARRAY2_SORT_INSERT(&fset->lvlArr, lvl, tsdbSttLvlCmprFn);
// if (code) return code;
// }
// code = TARRAY2_SORT_INSERT(&lvl->farr, fobj, tsdbTFileObjCmpr);
// if (code) return code;
// } else {
// fset->farr[fobj->f.type] = fobj;
// }
// } else if (op->nf.size == 0) {
// // delete
// ASSERT(0);
// } else {
// // modify
// ASSERT(0);
// }
return 0;
}
......@@ -457,4 +457,11 @@ int64_t tsdbTFileSetMaxCid(const STFileSet *fset) {
TARRAY2_FOREACH(&lvl->farr, fobj) { maxCid = TMAX(maxCid, fobj->f.cid); }
}
return maxCid;
}
bool tsdbTFileSetIsEmpty(const STFileSet *fset) {
for (tsdb_ftype_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) {
if (fset->farr[ftype] != NULL) return false;
}
return TARRAY2_SIZE(&fset->lvlArr) == 0;
}
\ No newline at end of file
......@@ -570,14 +570,16 @@ int32_t tsdbSttFWriterClose(SSttFileWriter **ppWriter, int8_t abort, struct STFi
TSDB_CHECK_CODE(code, lino, _exit);
if (op) {
op->fid = ppWriter[0]->config.file.fid;
op->oState = ppWriter[0]->config.file;
op->nState = ppWriter[0]->tFile;
if (op->oState.size == 0) {
op->op = TSDB_FOP_CREATE;
STFile *f = &ppWriter[0]->config.file;
op->fid = f->fid;
if (f->size == 0) {
op->of = NULL;
} else {
op->op = TSDB_FOP_EXTEND;
op->of = &op->fArr[0];
op->of[0] = f[0];
}
op->nf = &op->fArr[1];
op->nf[0] = ppWriter[0]->tFile;
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册