提交 71df0a00 编写于 作者: H Hongze Cheng

more code

上级 438d0cae
...@@ -39,6 +39,7 @@ typedef struct { ...@@ -39,6 +39,7 @@ typedef struct {
TSKEY nextKey; TSKEY nextKey;
int32_t fid; int32_t fid;
int32_t expLevel; int32_t expLevel;
SDiskID did;
TSKEY minKey; TSKEY minKey;
TSKEY maxKey; TSKEY maxKey;
STFileSet *fset; STFileSet *fset;
...@@ -60,12 +61,6 @@ static int32_t tsdbCommitOpenNewSttWriter(SCommitter2 *committer) { ...@@ -60,12 +61,6 @@ static int32_t tsdbCommitOpenNewSttWriter(SCommitter2 *committer) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
SDiskID did[1];
if (tfsAllocDisk(committer->tsdb->pVnode->pTfs, committer->ctx->expLevel, did) < 0) {
code = TSDB_CODE_FS_NO_VALID_DISK;
TSDB_CHECK_CODE(code, lino, _exit);
}
SSttFileWriterConfig config[1] = {{ SSttFileWriterConfig config[1] = {{
.tsdb = committer->tsdb, .tsdb = committer->tsdb,
.maxRow = committer->maxRow, .maxRow = committer->maxRow,
...@@ -75,7 +70,7 @@ static int32_t tsdbCommitOpenNewSttWriter(SCommitter2 *committer) { ...@@ -75,7 +70,7 @@ static int32_t tsdbCommitOpenNewSttWriter(SCommitter2 *committer) {
.file = .file =
{ {
.type = TSDB_FTYPE_STT, .type = TSDB_FTYPE_STT,
.did = did[0], .did = committer->ctx->did,
.fid = committer->ctx->fid, .fid = committer->ctx->fid,
.cid = committer->ctx->cid, .cid = committer->ctx->cid,
}, },
...@@ -122,31 +117,62 @@ static int32_t tsdbCommitOpenWriter(SCommitter2 *committer) { ...@@ -122,31 +117,62 @@ static int32_t tsdbCommitOpenWriter(SCommitter2 *committer) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
// if (committer->sttTrigger == 1) {
// SDataFileWriterConfig config = {
// // TODO
// };
// code = tsdbDataFileWriterOpen(&config, &committer->dataWriter);
// TSDB_CHECK_CODE(code, lino, _exit);
// // TODO
// }
// stt writer // stt writer
if (!committer->ctx->fset) { if (committer->ctx->fset == NULL) {
return tsdbCommitOpenNewSttWriter(committer); code = tsdbCommitOpenNewSttWriter(committer);
} TSDB_CHECK_CODE(code, lino, _exit);
} else {
const SSttLvl *lvl0 = tsdbTFileSetGetSttLvl(committer->ctx->fset, 0); const SSttLvl *lvl0 = tsdbTFileSetGetSttLvl(committer->ctx->fset, 0);
if (lvl0 == NULL || TARRAY2_SIZE(lvl0->fobjArr) == 0) { if (lvl0 == NULL || TARRAY2_SIZE(lvl0->fobjArr) == 0) {
return tsdbCommitOpenNewSttWriter(committer); code = tsdbCommitOpenNewSttWriter(committer);
} TSDB_CHECK_CODE(code, lino, _exit);
} else {
STFileObj *fobj = TARRAY2_LAST(lvl0->fobjArr); STFileObj *fobj = TARRAY2_LAST(lvl0->fobjArr);
if (fobj->f->stt->nseg >= committer->sttTrigger) { if (fobj->f->stt->nseg >= committer->sttTrigger) {
return tsdbCommitOpenNewSttWriter(committer); code = tsdbCommitOpenNewSttWriter(committer);
TSDB_CHECK_CODE(code, lino, _exit);
if (committer->sttTrigger == 1) {
SSttFileReaderConfig sttFileReaderConfig = {
.tsdb = committer->tsdb,
.szPage = committer->szPage,
.file = fobj->f[0],
};
code = tsdbSttFileReaderOpen(NULL, &sttFileReaderConfig, &committer->sttReader);
TSDB_CHECK_CODE(code, lino, _exit);
}
} else { } else {
return tsdbCommitOpenExistSttWriter(committer, fobj->f); code = tsdbCommitOpenExistSttWriter(committer, fobj->f);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
}
// data writer
if (committer->sttTrigger == 1) {
// data writer
SDataFileWriterConfig config = {
.tsdb = committer->tsdb,
.cmprAlg = committer->cmprAlg,
.maxRow = committer->maxRow,
.szPage = committer->szPage,
.fid = committer->ctx->fid,
.cid = committer->ctx->cid,
.did = committer->ctx->did,
.compactVersion = committer->compactVersion,
};
if (committer->ctx->fset) {
for (int32_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ftype++) {
if (committer->ctx->fset->farr[ftype] != NULL) {
config.files[ftype].exist = true;
config.files[ftype].file = committer->ctx->fset->farr[ftype]->f[0];
}
}
}
code = tsdbDataFileWriterOpen(&config, &committer->dataWriter);
TSDB_CHECK_CODE(code, lino, _exit);
} }
_exit: _exit:
...@@ -156,13 +182,6 @@ _exit: ...@@ -156,13 +182,6 @@ _exit:
return code; return code;
} }
static int32_t tsdbCommitWriteDelData(SCommitter2 *committer, int64_t suid, int64_t uid, int64_t version, int64_t sKey,
int64_t eKey) {
int32_t code = 0;
// TODO
return code;
}
static int32_t tsdbCommitTSDataOpenIterMerger(SCommitter2 *committer) { static int32_t tsdbCommitTSDataOpenIterMerger(SCommitter2 *committer) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
...@@ -455,54 +474,6 @@ _exit: ...@@ -455,54 +474,6 @@ _exit:
return code; return code;
} }
static int32_t tsdbCommitTombDataToStt(SCommitter2 *committer) {
int32_t code = 0;
int32_t lino = 0;
for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->iterMerger));) {
code = tsdbSttFileWriteTombRecord(committer->sttWriter, record);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbIterMergerNext(committer->iterMerger);
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
}
return code;
}
static int32_t tsdbCommitTombDataToData(SCommitter2 *committer) {
int32_t code = 0;
int32_t lino = 0;
if (committer->dataWriter == NULL || tsdbSttFileWriterIsOpened(committer->sttWriter)) {
for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->iterMerger));) {
code = tsdbSttFileWriteTombRecord(committer->sttWriter, record);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbIterMergerNext(committer->iterMerger);
TSDB_CHECK_CODE(code, lino, _exit);
}
} else {
for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->iterMerger));) {
code = tsdbDataFileWriteTombRecord(committer->dataWriter, record);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbIterMergerNext(committer->iterMerger);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
}
return code;
}
static int32_t tsdbCommitTombDataOpenIter(SCommitter2 *committer) { static int32_t tsdbCommitTombDataOpenIter(SCommitter2 *committer) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
...@@ -561,13 +532,23 @@ static int32_t tsdbCommitTombData(SCommitter2 *committer) { ...@@ -561,13 +532,23 @@ static int32_t tsdbCommitTombData(SCommitter2 *committer) {
code = tsdbCommitTombDataOpenIter(committer); code = tsdbCommitTombDataOpenIter(committer);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
if (committer->sttTrigger > 1) { if (committer->dataWriter == NULL || tsdbSttFileWriterIsOpened(committer->sttWriter)) {
code = tsdbCommitTombDataToStt(committer); for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->iterMerger));) {
code = tsdbSttFileWriteTombRecord(committer->sttWriter, record);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbIterMergerNext(committer->iterMerger);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
}
} else { } else {
code = tsdbCommitTombDataToData(committer); for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->iterMerger));) {
code = tsdbDataFileWriteTombRecord(committer->dataWriter, record);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbIterMergerNext(committer->iterMerger);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
}
code = tsdbCommitTombDataCloseIter(committer); code = tsdbCommitTombDataCloseIter(committer);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
...@@ -588,6 +569,8 @@ static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) { ...@@ -588,6 +569,8 @@ static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) {
committer->ctx->expLevel = tsdbFidLevel(committer->ctx->fid, &tsdb->keepCfg, committer->ctx->now); committer->ctx->expLevel = tsdbFidLevel(committer->ctx->fid, &tsdb->keepCfg, committer->ctx->now);
tsdbFidKeyRange(committer->ctx->fid, committer->minutes, committer->precision, &committer->ctx->minKey, tsdbFidKeyRange(committer->ctx->fid, committer->minutes, committer->precision, &committer->ctx->minKey,
&committer->ctx->maxKey); &committer->ctx->maxKey);
code = tfsAllocDisk(committer->tsdb->pVnode->pTfs, committer->ctx->expLevel, &committer->ctx->did);
TSDB_CHECK_CODE(code, lino, _exit);
STFileSet fset = {.fid = committer->ctx->fid}; STFileSet fset = {.fid = committer->ctx->fid};
committer->ctx->fset = &fset; committer->ctx->fset = &fset;
committer->ctx->fset = TARRAY2_SEARCH_EX(committer->fsetArr, &committer->ctx->fset, tsdbTFileSetCmprFn, TD_EQ); committer->ctx->fset = TARRAY2_SEARCH_EX(committer->fsetArr, &committer->ctx->fset, tsdbTFileSetCmprFn, TD_EQ);
......
...@@ -661,6 +661,8 @@ int32_t tsdbMerge(void *arg) { ...@@ -661,6 +661,8 @@ int32_t tsdbMerge(void *arg) {
.sttTrigger = tsdb->pVnode->config.sttTrigger, .sttTrigger = tsdb->pVnode->config.sttTrigger,
}}; }};
ASSERT(merger->sttTrigger > 1);
code = tsdbFSCreateCopySnapshot(tsdb->pFS, &merger->fsetArr); code = tsdbFSCreateCopySnapshot(tsdb->pFS, &merger->fsetArr);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册