提交 9156b297 编写于 作者: H Hongze Cheng

more code

上级 fa00d0df
...@@ -16,15 +16,33 @@ ...@@ -16,15 +16,33 @@
#include "inc/tsdbMerge.h" #include "inc/tsdbMerge.h"
typedef struct { typedef struct {
bool launched; bool launched;
bool toData;
int32_t level;
STFileSet *fset;
} SMergeCtx; } SMergeCtx;
typedef struct { typedef struct {
STsdb *tsdb; STsdb *tsdb;
SMergeCtx ctx; // config
int32_t maxRow;
int32_t szPage;
int8_t cmprAlg;
int64_t cid;
SSkmInfo skmTb;
SSkmInfo skmRow;
uint8_t *aBuf[5];
// context
SMergeCtx ctx;
// reader
TARRAY2(SSttFileReader *) sttReaderArr;
SDataFileReader *dataReader;
// writer
SSttFileWriter *sttWriter; SSttFileWriter *sttWriter;
SDataFileWriter *dataWriter; SDataFileWriter *dataWriter;
TFileOpArray fopArr; // operations
TFileOpArray fopArr;
} SMerger; } SMerger;
static int32_t tsdbMergerOpen(SMerger *merger) { static int32_t tsdbMergerOpen(SMerger *merger) {
...@@ -58,57 +76,117 @@ _exit: ...@@ -58,57 +76,117 @@ _exit:
return 0; return 0;
} }
static int32_t tsdbMergeFileSet(SMerger *merger, STFileSet *fset) { static int32_t tsdbDoMergeFileSet(SMerger *merger) {
int32_t code = 0; // TODO
int32_t lino = 0; return 0;
}
if (merger->ctx.launched == false) { static int32_t tsdbMergeFileSetBegin(SMerger *merger) {
code = tsdbMergerOpen(merger); int32_t code = 0;
TSDB_CHECK_CODE(code, lino, _exit); int32_t lino = 0;
} int32_t vid = TD_VID(merger->tsdb->pVnode);
STFileSet *fset = merger->ctx.fset;
// prepare the merger file set
SSttLvl *lvl;
STFileObj *fobj;
merger->ctx.toData = true;
merger->ctx.level = 0;
TARRAY2_FOREACH(&fset->lvlArr, lvl) {
if (lvl->level != merger->ctx.level) {
lvl = NULL;
break;
}
{ // prepare the merger file set fobj = TARRAY2_GET(&lvl->farr, 0);
SSttLvl *lvl; if (fobj->f.stt.nseg < merger->tsdb->pVnode->config.sttTrigger) {
STFileObj *fobj; merger->ctx.toData = false;
bool mergerToData = true; break;
int32_t level = -1; } else {
ASSERT(lvl->level == 0 || TARRAY2_SIZE(&lvl->farr) == 1);
merger->ctx.level++;
TARRAY2_FOREACH(&fset->lvlArr, lvl) { // open the reader
if (lvl->level - level > 1) { SSttFileReader *reader;
mergerToData = false; // code = tsdbSttFileReaderOpen(&fobj->f.stt, &reader);
break; // TSDB_CHECK_CODE(code, lino, _exit);
}
if (lvl->level == 0) { code = TARRAY2_APPEND(&merger->sttReaderArr, reader);
} else { TSDB_CHECK_CODE(code, lino, _exit);
ASSERT(TARRAY2_SIZE(&lvl->farr) == 1);
fobj = TARRAY2_FIRST(&lvl->farr); // add the operation
} STFileOp op = {
.fid = fobj->f.fid,
.optype = TSDB_FOP_REMOVE,
.of = fobj->f,
};
code = TARRAY2_APPEND(&merger->fopArr, op);
TSDB_CHECK_CODE(code, lino, _exit);
} }
}
// merge to level // open stt file writer
level = level + 1; SSttFileWriterConfig config = {
lvl = tsdbTFileSetGetLvl(fset, level); .pTsdb = merger->tsdb,
if (lvl == NULL) { .maxRow = merger->maxRow,
// open new stt file to merge to .szPage = merger->szPage,
} else { .cmprAlg = merger->cmprAlg,
// open existing stt file to merge to .pSkmTb = &merger->skmTb,
} .pSkmRow = &merger->skmRow,
.aBuf = merger->aBuf,
};
if (lvl) {
config.file = fobj->f;
} else {
config.file = (STFile){
.type = TSDB_FTYPE_STT,
.did = {.level = 0, .id = 0},
.fid = fset->fid,
.cid = merger->cid,
.size = 0,
.stt = {.level = merger->ctx.level, .nseg = 0},
};
}
code = tsdbSttFWriterOpen(&config, &merger->sttWriter);
TSDB_CHECK_CODE(code, lino, _exit);
if (mergerToData) { // open data file writer
// code = tsdbDataFWriterOpen(SDataFWriter * *ppWriter, STsdb * pTsdb, SDFileSet * pSet); if (merger->ctx.toData) {
// TSDB_CHECK_CODE(code, lino, _exit); // code = tsdbDataFWriterOpen();
} // TSDB_CHECK_CODE(code, lino, _exit);
} }
{ _exit:
// do merge the file set if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
} }
return code;
}
static int32_t tsdbMergeFileSetEnd(SMerger *merger) {
// TODO
return 0;
}
static int32_t tsdbMergeFileSet(SMerger *merger, STFileSet *fset) {
int32_t code = 0;
int32_t lino = 0;
{ // end merge the file set if (merger->ctx.launched == false) {
code = tsdbMergerOpen(merger);
TSDB_CHECK_CODE(code, lino, _exit);
} }
merger->ctx.fset = fset;
code = tsdbMergeFileSetBegin(merger);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbDoMergeFileSet(merger);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbMergeFileSetEnd(merger);
TSDB_CHECK_CODE(code, lino, _exit);
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(merger->tsdb->pVnode), __func__, lino, tstrerror(code)); tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(merger->tsdb->pVnode), __func__, lino, tstrerror(code));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册