提交 85c4c610 编写于 作者: H Hongze Cheng

more code

上级 5f81daa4
......@@ -42,11 +42,11 @@ typedef void (*TArray2Cb)(void *);
#define TARRAY2_INITIALIZER \
{ 0, 0, NULL }
#define TARRAY2_SIZE(a) ((a)->size)
#define TARRAY2_ELEM(a, i) ((a)->data[i])
#define TARRAY2_ELEM_PTR(a, i) (&((a)->data[i]))
#define TARRAY2_FIRST(a) ((a)->data[0])
#define TARRAY2_LAST(a) ((a)->data[(a)->size - 1])
#define TARRAY2_SIZE(a) ((a)->size)
#define TARRAY2_GET(a, i) ((a)->data[i])
#define TARRAY2_GET_PTR(a, i) (&((a)->data[i]))
#define TARRAY2_FIRST(a) ((a)->data[0])
#define TARRAY2_LAST(a) ((a)->data[(a)->size - 1])
static FORCE_INLINE int32_t tarray2_make_room( //
void *arg, // array
......
......@@ -49,7 +49,7 @@ int32_t tsdbJsonToTFileSet(STsdb *pTsdb, const cJSON *json, STFileSet **fset);
int32_t tsdbTFileSetCmprFn(const STFileSet **fset1, const STFileSet **fset2);
// edit
int32_t tsdbTFileSetEdit(STsdb *pTsdb, STFileSet *fset, const STFileOp *op);
int32_t tsdbTFileSetEditEx(const STFileSet *fset1, STFileSet *fset);
int32_t tsdbTFileSetApplyEdit(STsdb *pTsdb, const STFileSet *fset1, STFileSet *fset);
// max commit id
int64_t tsdbTFileSetMaxCid(const STFileSet *fset);
......
......@@ -45,6 +45,8 @@ enum {
int32_t tsdbTFileToJson(const STFile *f, cJSON *json);
int32_t tsdbJsonToTFile(const cJSON *json, tsdb_ftype_t ftype, STFile *f);
int32_t tsdbTFileName(STsdb *pTsdb, const STFile *f, char fname[]);
bool tsdbIsSameTFile(const STFile *f1, const STFile *f2);
bool tsdbIsTFileChanged(const STFile *f1, const STFile *f2);
// STFileObj
int32_t tsdbTFileObjInit(STsdb *pTsdb, const STFile *f, STFileObj **fobj);
......
......@@ -326,7 +326,7 @@ static int32_t open_committer(STsdb *pTsdb, SCommitInfo *pInfo, SCommitter *pCom
pCommitter->minRow = pInfo->info.config.tsdbCfg.minRows;
pCommitter->maxRow = pInfo->info.config.tsdbCfg.maxRows;
pCommitter->cmprAlg = pInfo->info.config.tsdbCfg.compression;
pCommitter->sttTrigger = 4; // TODO
pCommitter->sttTrigger = 1; // TODO
pCommitter->aTbDataP = tsdbMemTableGetTbDataArray(pTsdb->imem);
if (pCommitter->aTbDataP == NULL) {
......
......@@ -247,8 +247,8 @@ static int32_t apply_commit(STFileSystem *fs) {
int32_t i1 = 0, i2 = 0;
while (i1 < TARRAY2_SIZE(fsetArray1) || i2 < TARRAY2_SIZE(fsetArray2)) {
STFileSet *fset1 = i1 < TARRAY2_SIZE(fsetArray1) ? TARRAY2_ELEM(fsetArray1, i1) : NULL;
STFileSet *fset2 = i2 < TARRAY2_SIZE(fsetArray2) ? TARRAY2_ELEM(fsetArray2, i2) : NULL;
STFileSet *fset1 = i1 < TARRAY2_SIZE(fsetArray1) ? TARRAY2_GET(fsetArray1, i1) : NULL;
STFileSet *fset2 = i2 < TARRAY2_SIZE(fsetArray2) ? TARRAY2_GET(fsetArray2, i2) : NULL;
if (fset1 && fset2) {
if (fset1->fid < fset2->fid) {
......@@ -264,7 +264,7 @@ static int32_t apply_commit(STFileSystem *fs) {
i2++;
} else {
// edit
code = tsdbTFileSetEditEx(fset2, fset1);
code = tsdbTFileSetApplyEdit(fs->pTsdb, fset2, fset1);
if (code) return code;
i1++;
i2++;
......
......@@ -55,6 +55,59 @@ static void tsdbSttLvlRemove(SSttLvl **lvl) {
lvl[0] = NULL;
}
static int32_t tsdbSttLvlApplyEdit(STsdb *pTsdb, const SSttLvl *lvl1, SSttLvl *lvl2) {
int32_t code = 0;
ASSERT(lvl1->level == lvl2->level);
int32_t i1 = 0, i2 = 0;
while (i1 < TARRAY2_SIZE(&lvl1->farr) || i2 < TARRAY2_SIZE(&lvl2->farr)) {
STFileObj *fobj1 = i1 < TARRAY2_SIZE(&lvl1->farr) ? TARRAY2_GET(&lvl1->farr, i1) : NULL;
STFileObj *fobj2 = i2 < TARRAY2_SIZE(&lvl2->farr) ? TARRAY2_GET(&lvl2->farr, i2) : NULL;
if (fobj1 && fobj2) {
if (fobj1->f.cid < fobj2->f.cid) {
// create a file obj
code = tsdbTFileObjInit(pTsdb, &fobj1->f, &fobj2);
if (code) return code;
code = TARRAY2_APPEND(&lvl2->farr, fobj2);
if (code) return code;
i1++;
i2++;
} else if (fobj1->f.cid > fobj2->f.cid) {
// remove a file obj
TARRAY2_REMOVE(&lvl2->farr, i2, tsdbSttLvlRemoveFObj);
} else {
if (tsdbIsSameTFile(&fobj1->f, &fobj2->f)) {
if (tsdbIsTFileChanged(&fobj1->f, &fobj2->f)) {
fobj2->f = fobj1->f;
}
} else {
TARRAY2_REMOVE(&lvl2->farr, i2, tsdbSttLvlRemoveFObj);
code = tsdbTFileObjInit(pTsdb, &fobj1->f, &fobj2);
if (code) return code;
code = TARRAY2_SORT_INSERT(&lvl2->farr, fobj2, tsdbTFileObjCmpr);
if (code) return code;
}
i1++;
i2++;
}
} else if (fobj1) {
// create a file obj
code = tsdbTFileObjInit(pTsdb, &fobj1->f, &fobj2);
if (code) return code;
code = TARRAY2_APPEND(&lvl2->farr, fobj2);
if (code) return code;
i1++;
i2++;
} else {
// remove a file obj
TARRAY2_REMOVE(&lvl2->farr, i2, tsdbSttLvlRemoveFObj);
}
}
return 0;
}
static int32_t tsdbSttLvlCmprFn(const SSttLvl **lvl1, const SSttLvl **lvl2) {
if (lvl1[0]->level < lvl2[0]->level) return -1;
if (lvl1[0]->level > lvl2[0]->level) return 1;
......@@ -239,9 +292,77 @@ int32_t tsdbTFileSetEdit(STsdb *pTsdb, STFileSet *fset, const STFileOp *op) {
return 0;
}
int32_t tsdbTFileSetEditEx(const STFileSet *fset1, STFileSet *fset) {
ASSERT(fset1->fid == fset->fid);
ASSERT(0);
int32_t tsdbTFileSetApplyEdit(STsdb *pTsdb, const STFileSet *fset1, STFileSet *fset2) {
int32_t code = 0;
ASSERT(fset1->fid == fset2->fid);
for (tsdb_ftype_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) {
STFileObj *fobj1 = fset1->farr[ftype];
STFileObj *fobj2 = fset2->farr[ftype];
if (!fobj1 && !fobj2) continue;
if (fobj1 && fobj2) {
if (tsdbIsSameTFile(&fobj1->f, &fobj2->f)) {
if (tsdbIsTFileChanged(&fobj1->f, &fobj2->f)) {
fobj2->f = fobj1->f;
}
} else {
tsdbTFileObjRemove(fobj2);
code = tsdbTFileObjInit(pTsdb, &fobj1->f, &fset2->farr[ftype]);
if (code) return code;
}
} else if (fobj1) {
// create a new file
code = tsdbTFileObjInit(pTsdb, &fobj1->f, &fset2->farr[ftype]);
if (code) return code;
} else {
// remove the file
tsdbTFileObjRemove(fobj2);
fset2->farr[ftype] = NULL;
}
}
// stt part
int32_t i1 = 0, i2 = 0;
while (i1 < TARRAY2_SIZE(&fset1->lvlArr) || i2 < TARRAY2_SIZE(&fset2->lvlArr)) {
SSttLvl *lvl1 = i1 < TARRAY2_SIZE(&fset1->lvlArr) ? TARRAY2_GET(&fset1->lvlArr, i1) : NULL;
SSttLvl *lvl2 = i2 < TARRAY2_SIZE(&fset2->lvlArr) ? TARRAY2_GET(&fset2->lvlArr, i2) : NULL;
if (lvl1 && lvl2) {
if (lvl1->level < lvl2->level) {
// add a new stt level
code = tsdbSttLvlInitEx(pTsdb, lvl1, &lvl2);
if (code) return code;
code = TARRAY2_SORT_INSERT(&fset2->lvlArr, lvl2, tsdbSttLvlCmprFn);
if (code) return code;
i1++;
i2++;
} else if (lvl1->level > lvl2->level) {
// remove the stt level
TARRAY2_REMOVE(&fset2->lvlArr, i2, tsdbSttLvlRemove);
} else {
// apply edit on stt level
code = tsdbSttLvlApplyEdit(pTsdb, lvl1, lvl2);
if (code) return code;
i1++;
i2++;
}
} else if (lvl1) {
// add a new stt level
code = tsdbSttLvlInitEx(pTsdb, lvl1, &lvl2);
if (code) return code;
code = TARRAY2_SORT_INSERT(&fset2->lvlArr, lvl2, tsdbSttLvlCmprFn);
if (code) return code;
i1++;
i2++;
} else {
// remove the stt level
TARRAY2_REMOVE(&fset2->lvlArr, i2, tsdbSttLvlRemove);
}
}
return 0;
}
......
......@@ -279,6 +279,21 @@ int32_t tsdbTFileName(STsdb *pTsdb, const STFile *f, char fname[]) {
return 0;
}
bool tsdbIsSameTFile(const STFile *f1, const STFile *f2) {
if (f1->type != f2->type) return false;
if (f1->did.level != f2->did.level) return false;
if (f1->did.id != f2->did.id) return false;
if (f1->fid != f2->fid) return false;
if (f1->cid != f2->cid) return false;
return true;
}
bool tsdbIsTFileChanged(const STFile *f1, const STFile *f2) {
if (f1->size != f2->size) return true;
if (f1->type == TSDB_FTYPE_STT && f1->stt.nseg != f2->stt.nseg) return true;
return false;
}
int32_t tsdbTFileObjCmpr(const STFileObj **fobj1, const STFileObj **fobj2) {
if (fobj1[0]->f.cid < fobj2[0]->f.cid) {
return -1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册