提交 b07ad168 编写于 作者: H Hongze Cheng

more code

上级 9156b297
...@@ -30,7 +30,8 @@ typedef TARRAY2(SSttLvl *) TSttLvlArray; ...@@ -30,7 +30,8 @@ typedef TARRAY2(SSttLvl *) TSttLvlArray;
typedef TARRAY2(STFileOp) TFileOpArray; typedef TARRAY2(STFileOp) TFileOpArray;
typedef enum { typedef enum {
TSDB_FOP_CREATE = 1, TSDB_FOP_NONE = 0,
TSDB_FOP_CREATE,
TSDB_FOP_REMOVE, TSDB_FOP_REMOVE,
TSDB_FOP_MODIFY, TSDB_FOP_MODIFY,
} tsdb_fop_t; } tsdb_fop_t;
......
...@@ -56,7 +56,8 @@ typedef struct SSttFileWriterConfig SSttFileWriterConfig; ...@@ -56,7 +56,8 @@ typedef struct SSttFileWriterConfig SSttFileWriterConfig;
int32_t tsdbSttFWriterOpen(const SSttFileWriterConfig *config, SSttFileWriter **ppWriter); int32_t tsdbSttFWriterOpen(const SSttFileWriterConfig *config, SSttFileWriter **ppWriter);
int32_t tsdbSttFWriterClose(SSttFileWriter **ppWriter, int8_t abort, struct STFileOp *op); int32_t tsdbSttFWriterClose(SSttFileWriter **ppWriter, int8_t abort, struct STFileOp *op);
int32_t tsdbSttFWriteTSData(SSttFileWriter *pWriter, TABLEID *tbid, TSDBROW *pRow); int32_t tsdbSttFWriteTSData(SSttFileWriter *pWriter, SRowInfo *pRowInfo);
int32_t tsdbSttFWriteTSDataBlock(SSttFileWriter *pWriter, SBlockData *pBlockData);
int32_t tsdbSttFWriteDLData(SSttFileWriter *pWriter, TABLEID *tbid, SDelData *pDelData); int32_t tsdbSttFWriteDLData(SSttFileWriter *pWriter, TABLEID *tbid, SDelData *pDelData);
struct SSttFileWriterConfig { struct SSttFileWriterConfig {
......
...@@ -133,7 +133,7 @@ static int32_t open_committer_writer(SCommitter *pCommitter) { ...@@ -133,7 +133,7 @@ static int32_t open_committer_writer(SCommitter *pCommitter) {
} }
} }
static int32_t tsdbCommitWriteTSData(SCommitter *pCommitter, TABLEID *tbid, TSDBROW *pRow) { static int32_t tsdbCommitWriteTSData(SCommitter *pCommitter, SRowInfo *pRowInfo) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
int32_t vid = TD_VID(pCommitter->pTsdb->pVnode); int32_t vid = TD_VID(pCommitter->pTsdb->pVnode);
...@@ -143,15 +143,12 @@ static int32_t tsdbCommitWriteTSData(SCommitter *pCommitter, TABLEID *tbid, TSDB ...@@ -143,15 +143,12 @@ static int32_t tsdbCommitWriteTSData(SCommitter *pCommitter, TABLEID *tbid, TSDB
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
code = tsdbSttFWriteTSData(pCommitter->pWriter, tbid, pRow); code = tsdbSttFWriteTSData(pCommitter->pWriter, pRowInfo);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d failed at line %d since %s", vid, lino, tstrerror(code)); tsdbError("vgId:%d failed at line %d since %s", vid, lino, tstrerror(code));
} else {
tsdbTrace("vgId:%d %s done, fid:%d suid:%" PRId64 " uid:%" PRId64 " ts:%" PRId64 " version:%" PRId64, vid, __func__,
pCommitter->fid, tbid->suid, tbid->uid, TSDBROW_KEY(pRow).ts, TSDBROW_KEY(pRow).version);
} }
return 0; return 0;
} }
...@@ -177,6 +174,7 @@ static int32_t commit_timeseries_data(SCommitter *pCommitter) { ...@@ -177,6 +174,7 @@ static int32_t commit_timeseries_data(SCommitter *pCommitter) {
for (int32_t iTbData = 0; iTbData < taosArrayGetSize(pCommitter->aTbDataP); iTbData++) { for (int32_t iTbData = 0; iTbData < taosArrayGetSize(pCommitter->aTbDataP); iTbData++) {
STbDataIter iter; STbDataIter iter;
STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData); STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData);
SRowInfo rowInfo = {.suid = pTbData->suid, .uid = pTbData->uid};
tsdbTbDataIterOpen(pTbData, &from, 0, &iter); tsdbTbDataIterOpen(pTbData, &from, 0, &iter);
...@@ -188,7 +186,8 @@ static int32_t commit_timeseries_data(SCommitter *pCommitter) { ...@@ -188,7 +186,8 @@ static int32_t commit_timeseries_data(SCommitter *pCommitter) {
break; break;
} }
code = tsdbCommitWriteTSData(pCommitter, (TABLEID *)pTbData, pRow); rowInfo.row = *pRow;
code = tsdbCommitWriteTSData(pCommitter, &rowInfo);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
nRow++; nRow++;
......
...@@ -20,10 +20,14 @@ typedef struct { ...@@ -20,10 +20,14 @@ typedef struct {
bool toData; bool toData;
int32_t level; int32_t level;
STFileSet *fset; STFileSet *fset;
SRowInfo *pRowInfo;
SBlockData bData;
} SMergeCtx; } SMergeCtx;
typedef struct { typedef struct {
STsdb *tsdb; STsdb *tsdb;
// context
SMergeCtx ctx;
// config // config
int32_t maxRow; int32_t maxRow;
int32_t szPage; int32_t szPage;
...@@ -32,9 +36,6 @@ typedef struct { ...@@ -32,9 +36,6 @@ typedef struct {
SSkmInfo skmTb; SSkmInfo skmTb;
SSkmInfo skmRow; SSkmInfo skmRow;
uint8_t *aBuf[5]; uint8_t *aBuf[5];
// context
SMergeCtx ctx;
// reader // reader
TARRAY2(SSttFileReader *) sttReaderArr; TARRAY2(SSttFileReader *) sttReaderArr;
SDataFileReader *dataReader; SDataFileReader *dataReader;
...@@ -76,11 +77,62 @@ _exit: ...@@ -76,11 +77,62 @@ _exit:
return 0; return 0;
} }
static int32_t tsdbDoMergeFileSet(SMerger *merger) { static int32_t tsdbMergeNextRow(SMerger *merger) {
// TODO // TODO
return 0; return 0;
} }
static int32_t tsdbMergeToData(SMerger *merger) {
int32_t code = 0;
int32_t lino = 0;
int32_t vid = TD_VID(merger->tsdb->pVnode);
for (;;) {
code = tsdbMergeNextRow(merger);
TSDB_CHECK_CODE(code, lino, _exit);
if (!merger->ctx.pRowInfo) break;
code = tBlockDataAppendRow(&merger->ctx.bData, &merger->ctx.pRowInfo->row, NULL, merger->ctx.pRowInfo->uid);
TSDB_CHECK_CODE(code, lino, _exit);
if (merger->ctx.bData.nRow >= merger->maxRow) {
// code = tsdbDataFWriteTSDataBlock(merger->dataWriter, &merger->ctx.bData);
// TSDB_CHECK_CODE(code, lino, _exit);
tBlockDataReset(&merger->ctx.bData);
}
}
_exit:
if (code) {
tsdbError("vid:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
}
return code;
}
static int32_t tsdbMergeToUpperLevel(SMerger *merger) {
int32_t code = 0;
int32_t lino = 0;
int32_t vid = TD_VID(merger->tsdb->pVnode);
for (;;) {
code = tsdbMergeNextRow(merger);
TSDB_CHECK_CODE(code, lino, _exit);
if (!merger->ctx.pRowInfo) break;
code = tsdbSttFWriteTSData(merger->sttWriter, merger->ctx.pRowInfo);
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
if (code) {
tsdbError("vid:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
}
return code;
}
static int32_t tsdbMergeFileSetBegin(SMerger *merger) { static int32_t tsdbMergeFileSetBegin(SMerger *merger) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
...@@ -164,8 +216,29 @@ _exit: ...@@ -164,8 +216,29 @@ _exit:
return code; return code;
} }
static int32_t tsdbMergeFileSetEnd(SMerger *merger) { static int32_t tsdbMergeFileSetEnd(SMerger *merger) {
// TODO int32_t code = 0;
return 0; int32_t lino = 0;
int32_t vid = TD_VID(merger->tsdb->pVnode);
STFileOp op;
code = tsdbSttFWriterClose(&merger->sttWriter, 0, &op);
TSDB_CHECK_CODE(code, lino, _exit);
if (op.optype != TSDB_FOP_NONE) {
code = TARRAY2_APPEND(&merger->fopArr, op);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (merger->ctx.toData) {
// code = tsdbDataFWriterClose();
// TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
}
return code;
} }
static int32_t tsdbMergeFileSet(SMerger *merger, STFileSet *fset) { static int32_t tsdbMergeFileSet(SMerger *merger, STFileSet *fset) {
int32_t code = 0; int32_t code = 0;
...@@ -181,8 +254,14 @@ static int32_t tsdbMergeFileSet(SMerger *merger, STFileSet *fset) { ...@@ -181,8 +254,14 @@ static int32_t tsdbMergeFileSet(SMerger *merger, STFileSet *fset) {
code = tsdbMergeFileSetBegin(merger); code = tsdbMergeFileSetBegin(merger);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbDoMergeFileSet(merger); // do merge
TSDB_CHECK_CODE(code, lino, _exit); if (merger->ctx.toData) {
code = tsdbMergeToData(merger);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
code = tsdbMergeToUpperLevel(merger);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tsdbMergeFileSetEnd(merger); code = tsdbMergeFileSetEnd(merger);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
......
...@@ -601,25 +601,23 @@ _exit: ...@@ -601,25 +601,23 @@ _exit:
return code; return code;
} }
int32_t tsdbSttFWriteTSData(SSttFileWriter *pWriter, TABLEID *tbid, TSDBROW *pRow) { int32_t tsdbSttFWriteTSData(SSttFileWriter *pWriter, SRowInfo *pRowInfo) {
int32_t code = 0; int32_t code = 0;
int32_t lino; int32_t lino;
TSDBKEY key = TSDBROW_KEY(pRow); TABLEID *tbid = (TABLEID *)pRowInfo;
TSDBROW *pRow = &pRowInfo->row;
TSDBKEY key = TSDBROW_KEY(pRow);
if (!TABLE_SAME_SCHEMA(pWriter->bData.suid, pWriter->bData.uid, tbid->suid, tbid->uid)) { if (!TABLE_SAME_SCHEMA(pWriter->bData.suid, pWriter->bData.uid, tbid->suid, tbid->uid)) {
if (pWriter->bData.nRow > 0) { if (pWriter->bData.nRow > 0) {
TSDB_CHECK_CODE( // code = write_timeseries_block(pWriter);
code = write_timeseries_block(pWriter), // TSDB_CHECK_CODE(code, lino, _exit);
lino, //
_exit);
} }
if (pWriter->sData.nRow >= pWriter->config.maxRow) { if (pWriter->sData.nRow >= pWriter->config.maxRow) {
TSDB_CHECK_CODE( // code = write_statistics_block(pWriter);
code = write_statistics_block(pWriter), // TSDB_CHECK_CODE(code, lino, _exit);
lino, //
_exit);
} }
pWriter->sData.aData[0][pWriter->sData.nRow] = tbid->suid; // suid pWriter->sData.aData[0][pWriter->sData.nRow] = tbid->suid; // suid
...@@ -631,54 +629,28 @@ int32_t tsdbSttFWriteTSData(SSttFileWriter *pWriter, TABLEID *tbid, TSDBROW *pRo ...@@ -631,54 +629,28 @@ int32_t tsdbSttFWriteTSData(SSttFileWriter *pWriter, TABLEID *tbid, TSDBROW *pRo
pWriter->sData.aData[6][pWriter->sData.nRow] = 1; // count pWriter->sData.aData[6][pWriter->sData.nRow] = 1; // count
pWriter->sData.nRow++; pWriter->sData.nRow++;
TSDB_CHECK_CODE( // code = tsdbUpdateSkmTb(pWriter->config.pTsdb, tbid, pWriter->config.pSkmTb);
code = tsdbUpdateSkmTb( // TSDB_CHECK_CODE(code, lino, _exit);
pWriter->config.pTsdb, //
tbid, //
pWriter->config.pSkmTb), //
lino, //
_exit);
TABLEID id = {.suid = tbid->suid, // TABLEID id = {
.uid = tbid->suid // .suid = tbid->suid,
? 0 .uid = tbid->uid ? 0 : tbid->uid,
: tbid->uid}; };
TSDB_CHECK_CODE( // code = tBlockDataInit(&pWriter->bData, &id, pWriter->config.pSkmTb->pTSchema, NULL, 0);
code = tBlockDataInit( // TSDB_CHECK_CODE(code, lino, _exit);
&pWriter->bData, //
&id, //
pWriter->config.pSkmTb->pTSchema, //
NULL, //
0), //
lino, //
_exit);
} }
if (pRow->type == TSDBROW_ROW_FMT) { if (pRowInfo->row.type == TSDBROW_ROW_FMT) {
TSDB_CHECK_CODE( // code = tsdbUpdateSkmRow(pWriter->config.pTsdb, tbid, TSDBROW_SVERSION(pRow), pWriter->config.pSkmRow);
code = tsdbUpdateSkmRow( // TSDB_CHECK_CODE(code, lino, _exit);
pWriter->config.pTsdb, //
tbid, //
TSDBROW_SVERSION(pRow), //
pWriter->config.pSkmRow), //
lino, //
_exit);
} }
TSDB_CHECK_CODE( // code = tBlockDataAppendRow(&pWriter->bData, pRow, pWriter->config.pSkmRow->pTSchema, tbid->uid);
code = tBlockDataAppendRow( // TSDB_CHECK_CODE(code, lino, _exit);
&pWriter->bData, //
pRow, //
pWriter->config.pSkmRow->pTSchema, //
tbid->uid), //
lino, //
_exit);
if (pWriter->bData.nRow >= pWriter->config.maxRow) { if (pWriter->bData.nRow >= pWriter->config.maxRow) {
TSDB_CHECK_CODE( // code = write_timeseries_block(pWriter);
code = write_timeseries_block(pWriter), // TSDB_CHECK_CODE(code, lino, _exit);
lino, //
_exit);
} }
if (key.ts > pWriter->sData.aData[4][pWriter->sData.nRow - 1]) { if (key.ts > pWriter->sData.aData[4][pWriter->sData.nRow - 1]) {
...@@ -694,16 +666,17 @@ int32_t tsdbSttFWriteTSData(SSttFileWriter *pWriter, TABLEID *tbid, TSDBROW *pRo ...@@ -694,16 +666,17 @@ int32_t tsdbSttFWriteTSData(SSttFileWriter *pWriter, TABLEID *tbid, TSDBROW *pRo
_exit: _exit:
if (code) { if (code) {
tsdbError( // tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->config.pTsdb->pVnode), __func__, lino,
"vgId:%d %s failed at line %d since %s", // tstrerror(code));
TD_VID(pWriter->config.pTsdb->pVnode), //
__func__, //
lino, //
tstrerror(code));
} }
return code; return code;
} }
int32_t tsdbSttFWriteTSDataBlock(SSttFileWriter *pWriter, SBlockData *pBlockData) {
// TODO
return 0;
}
int32_t tsdbSttFWriteDLData(SSttFileWriter *pWriter, TABLEID *tbid, SDelData *pDelData) { int32_t tsdbSttFWriteDLData(SSttFileWriter *pWriter, TABLEID *tbid, SDelData *pDelData) {
ASSERTS(0, "TODO: Not implemented yet"); ASSERTS(0, "TODO: Not implemented yet");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册