提交 c7d1c88f 编写于 作者: H Hongze Cheng

more code

上级 c8ba83cb
...@@ -39,7 +39,7 @@ void tRBTreeDrop(SRBTree *pTree, SRBTreeNode *z); ...@@ -39,7 +39,7 @@ void tRBTreeDrop(SRBTree *pTree, SRBTreeNode *z);
SRBTreeNode *tRBTreeDropByKey(SRBTree *pTree, void *pKey); SRBTreeNode *tRBTreeDropByKey(SRBTree *pTree, void *pKey);
SRBTreeNode *tRBTreeDropMin(SRBTree *pTree); SRBTreeNode *tRBTreeDropMin(SRBTree *pTree);
SRBTreeNode *tRBTreeDropMax(SRBTree *pTree); SRBTreeNode *tRBTreeDropMax(SRBTree *pTree);
SRBTreeNode *tRBTreeGet(SRBTree *pTree, const SRBTreeNode *pKeyNode); SRBTreeNode *tRBTreeGet(const SRBTree *pTree, const SRBTreeNode *pKeyNode);
// SRBTreeIter ============================================= // SRBTreeIter =============================================
#define tRBTreeIterCreate(tree, ascend) \ #define tRBTreeIterCreate(tree, ascend) \
......
...@@ -24,6 +24,7 @@ extern "C" { ...@@ -24,6 +24,7 @@ extern "C" {
typedef struct STFileSet STFileSet; typedef struct STFileSet STFileSet;
typedef struct STFileOp STFileOp; typedef struct STFileOp STFileOp;
typedef struct SSttLvl SSttLvl;
typedef enum { typedef enum {
TSDB_FOP_NONE = 0, TSDB_FOP_NONE = 0,
...@@ -39,9 +40,10 @@ int32_t tsdbFileSetInit(STFileSet *pSet, int32_t fid); ...@@ -39,9 +40,10 @@ int32_t tsdbFileSetInit(STFileSet *pSet, int32_t fid);
int32_t tsdbFileSetInitEx(const STFileSet *fset1, STFileSet *fset2); int32_t tsdbFileSetInitEx(const STFileSet *fset1, STFileSet *fset2);
int32_t tsdbFileSetClear(STFileSet *pSet); int32_t tsdbFileSetClear(STFileSet *pSet);
int32_t tsdbFileSetEdit(STFileSet *fset, const STFileOp *op); int32_t tsdbFileSetEdit(STFileSet *fset, const STFileOp *op);
int32_t tsdbFSetCmprFn(const STFileSet *pSet1, const STFileSet *pSet2); int32_t tsdbFSetCmprFn(const STFileSet *pSet1, const STFileSet *pSet2);
const SSttLvl *tsdbFileSetGetLvl(const STFileSet *fset, int32_t level);
struct STFileOp { struct STFileOp {
tsdb_fop_t op; tsdb_fop_t op;
int32_t fid; int32_t fid;
...@@ -49,12 +51,12 @@ struct STFileOp { ...@@ -49,12 +51,12 @@ struct STFileOp {
STFile nState; // new file state STFile nState; // new file state
}; };
typedef struct SSttLvl { struct SSttLvl {
int32_t level; // level int32_t level; // level
int32_t nstt; // number of .stt files on this level int32_t nstt; // number of .stt files on this level
SRBTree sttTree; // .stt file tree, sorted by cid SRBTree sttTree; // .stt file tree, sorted by cid
SRBTreeNode rbtn; SRBTreeNode rbtn;
} SSttLvl; };
struct STFileSet { struct STFileSet {
int32_t fid; int32_t fid;
......
...@@ -61,13 +61,13 @@ int32_t tsdbSttFWriteDLData(SSttFileWriter *pWriter, TABLEID *tbid, SDelData *pD ...@@ -61,13 +61,13 @@ int32_t tsdbSttFWriteDLData(SSttFileWriter *pWriter, TABLEID *tbid, SDelData *pD
struct SSttFileWriterConfig { struct SSttFileWriterConfig {
STsdb *pTsdb; STsdb *pTsdb;
STFile file;
int32_t maxRow; int32_t maxRow;
int32_t szPage; int32_t szPage;
int8_t cmprAlg; int8_t cmprAlg;
SSkmInfo *pSkmTb; SSkmInfo *pSkmTb;
SSkmInfo *pSkmRow; SSkmInfo *pSkmRow;
uint8_t **aBuf; uint8_t **aBuf;
STFile file;
}; };
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -37,57 +37,106 @@ typedef struct { ...@@ -37,57 +37,106 @@ typedef struct {
int32_t expLevel; int32_t expLevel;
TSKEY minKey; TSKEY minKey;
TSKEY maxKey; TSKEY maxKey;
const STFileSet *pFileSet; const STFileSet *fset;
// writer // writer
SSttFileWriter *pWriter; SSttFileWriter *pWriter;
} SCommitter; } SCommitter;
static int32_t open_committer_writer(SCommitter *pCommitter) { static int32_t open_writer_with_new_stt(SCommitter *pCommitter) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
STsdb *pTsdb = pCommitter->pTsdb; STsdb *pTsdb = pCommitter->pTsdb;
int32_t vid = TD_VID(pTsdb->pVnode); SVnode *pVnode = pTsdb->pVnode;
int32_t vid = TD_VID(pVnode);
SSttFileWriterConfig config;
SDiskID did;
if (tfsAllocDisk(pVnode->pTfs, pCommitter->expLevel, &did) < 0) {
code = TSDB_CODE_FS_NO_VALID_DISK;
TSDB_CHECK_CODE(code, lino, _exit);
}
config.pTsdb = pTsdb;
config.maxRow = pCommitter->maxRow;
config.szPage = pVnode->config.tsdbPageSize;
config.cmprAlg = pCommitter->cmprAlg;
config.pSkmTb = NULL;
config.pSkmRow = NULL;
config.aBuf = NULL;
config.file.type = TSDB_FTYPE_STT;
config.file.did = did;
config.file.fid = pCommitter->fid;
config.file.cid = pCommitter->eid;
config.file.size = 0;
config.file.stt.level = 0;
config.file.stt.nseg = 0;
tsdbTFileInit(pTsdb, &config.file);
code = tsdbSttFWriterOpen(&config, &pCommitter->pWriter);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
} else {
tsdbDebug("vgId:%d %s success", vid, __func__);
}
return code;
}
static int32_t open_writer_with_exist_stt(SCommitter *pCommitter, const STFile *pFile) {
int32_t code = 0;
int32_t lino = 0;
STsdb *pTsdb = pCommitter->pTsdb;
SVnode *pVnode = pTsdb->pVnode;
int32_t vid = TD_VID(pVnode);
SSttFileWriterConfig config = { SSttFileWriterConfig config = {
.pTsdb = pCommitter->pTsdb, //
.pTsdb = pTsdb,
.maxRow = pCommitter->maxRow, .maxRow = pCommitter->maxRow,
.szPage = pTsdb->pVnode->config.tsdbPageSize, .szPage = pVnode->config.tsdbPageSize,
.cmprAlg = pCommitter->cmprAlg, .cmprAlg = pCommitter->cmprAlg,
.pSkmTb = NULL, .pSkmTb = NULL,
.pSkmRow = NULL, .pSkmRow = NULL,
.aBuf = NULL, .aBuf = NULL,
.file = *pFile //
}; };
if (pCommitter->pFileSet) {
// TODO
ASSERT(0);
} else {
config.file.type = TSDB_FTYPE_STT;
if (tfsAllocDisk(pTsdb->pVnode->pTfs, pCommitter->expLevel, &config.file.did) < 0) {
code = TSDB_CODE_FS_NO_VALID_DISK;
TSDB_CHECK_CODE(code, lino, _exit);
}
config.file.fid = pCommitter->fid;
config.file.cid = pCommitter->eid;
config.file.size = 0;
config.file.stt.level = 0;
config.file.stt.nseg = 0;
tsdbTFileInit(pTsdb, &config.file);
}
code = tsdbSttFWriterOpen(&config, &pCommitter->pWriter); code = tsdbSttFWriterOpen(&config, &pCommitter->pWriter);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d %s failed at line %d since %s, fid:%d", vid, __func__, lino, tstrerror(code), pCommitter->fid); tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
} else {
tsdbDebug("vgId:%d %s success", vid, __func__);
} }
return code; return code;
} }
static int32_t open_committer_writer(SCommitter *pCommitter) {
if (!pCommitter->fset) {
return open_writer_with_new_stt(pCommitter);
}
const SSttLvl *lvl0 = tsdbFileSetGetLvl(pCommitter->fset, 0);
if (lvl0 == NULL) {
return open_writer_with_new_stt(pCommitter);
}
SRBTreeNode *node = tRBTreeMax(&lvl0->sttTree);
if (node == NULL) {
return open_writer_with_new_stt(pCommitter);
} else {
STFileObj *fobj = TCONTAINER_OF(node, STFileObj, rbtn);
if (fobj->f.stt.nseg >= pCommitter->sttTrigger) {
return open_writer_with_new_stt(pCommitter);
} else {
return open_writer_with_exist_stt(pCommitter, &fobj->f);
}
}
}
static int32_t tsdbCommitWriteTSData(SCommitter *pCommitter, TABLEID *tbid, TSDBROW *pRow) { static int32_t tsdbCommitWriteTSData(SCommitter *pCommitter, TABLEID *tbid, TSDBROW *pRow) {
int32_t code = 0; int32_t code = 0;
...@@ -211,7 +260,7 @@ static int32_t commit_fset_start(SCommitter *pCommitter) { ...@@ -211,7 +260,7 @@ static int32_t commit_fset_start(SCommitter *pCommitter) {
pCommitter->expLevel = tsdbFidLevel(pCommitter->fid, &pTsdb->keepCfg, taosGetTimestampSec()); pCommitter->expLevel = tsdbFidLevel(pCommitter->fid, &pTsdb->keepCfg, taosGetTimestampSec());
pCommitter->nextKey = TSKEY_MAX; pCommitter->nextKey = TSKEY_MAX;
tsdbFSGetFSet(pTsdb->pFS, pCommitter->fid, &pCommitter->pFileSet); tsdbFSGetFSet(pTsdb->pFS, pCommitter->fid, &pCommitter->fset);
tsdbDebug("vgId:%d %s done, fid:%d minKey:%" PRId64 " maxKey:%" PRId64 " expLevel:%d", vid, __func__, pCommitter->fid, tsdbDebug("vgId:%d %s done, fid:%d minKey:%" PRId64 " maxKey:%" PRId64 " expLevel:%d", vid, __func__, pCommitter->fid,
pCommitter->minKey, pCommitter->maxKey, pCommitter->expLevel); pCommitter->minKey, pCommitter->maxKey, pCommitter->expLevel);
......
...@@ -284,6 +284,7 @@ static int32_t apply_commit(STFileSystem *fs) { ...@@ -284,6 +284,7 @@ static int32_t apply_commit(STFileSystem *fs) {
if (code) return code; if (code) return code;
i1++; i1++;
n1++; n1++;
i2++;
} else { } else {
// edit // edit
code = apply_commit_upd_fset(fs, fset1, fset2); code = apply_commit_upd_fset(fs, fset1, fset2);
...@@ -302,6 +303,7 @@ static int32_t apply_commit(STFileSystem *fs) { ...@@ -302,6 +303,7 @@ static int32_t apply_commit(STFileSystem *fs) {
if (code) return code; if (code) return code;
i1++; i1++;
n1++; n1++;
i2++;
} }
} }
......
...@@ -295,3 +295,9 @@ int32_t tsdbFileSetClear(STFileSet *pSet) { ...@@ -295,3 +295,9 @@ int32_t tsdbFileSetClear(STFileSet *pSet) {
// TODO // TODO
return 0; return 0;
} }
const SSttLvl *tsdbFileSetGetLvl(const STFileSet *fset, int32_t level) {
SSttLvl tlvl = {.level = level};
SRBTreeNode *node = tRBTreeGet(&fset->lvlTree, &tlvl.rbtn);
return node ? TCONTAINER_OF(node, SSttLvl, rbtn) : NULL;
}
\ No newline at end of file
...@@ -443,7 +443,7 @@ SRBTreeNode *tRBTreeDropMax(SRBTree *pTree) { ...@@ -443,7 +443,7 @@ SRBTreeNode *tRBTreeDropMax(SRBTree *pTree) {
return pNode; return pNode;
} }
SRBTreeNode *tRBTreeGet(SRBTree *pTree, const SRBTreeNode *pKeyNode) { SRBTreeNode *tRBTreeGet(const SRBTree *pTree, const SRBTreeNode *pKeyNode) {
SRBTreeNode *pNode = pTree->root; SRBTreeNode *pNode = pTree->root;
while (pNode != pTree->NIL) { while (pNode != pTree->NIL) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册