提交 41459a92 编写于 作者: H Hongze Cheng

more code

上级 6aac0e0c
......@@ -76,9 +76,8 @@ typedef struct STsdbFilterInfo STsdbFilterInfo;
#define TSDBROW_ROW_FMT ((int8_t)0x0)
#define TSDBROW_COL_FMT ((int8_t)0x1)
#define TSDB_FILE_DLMT ((uint32_t)0xF00AFA0F)
#define TSDB_MAX_SUBBLOCKS 8
#define TSDB_FHDR_SIZE 512
#define TSDB_FILE_DLMT ((uint32_t)0xF00AFA0F)
#define TSDB_FHDR_SIZE 512
#define VERSION_MIN 0
#define VERSION_MAX INT64_MAX
......@@ -498,7 +497,7 @@ struct SDataBlk {
int32_t nRow;
int8_t hasDup;
int8_t nSubBlock;
SBlockInfo aSubBlock[TSDB_MAX_SUBBLOCKS];
SBlockInfo aSubBlock[1];
SSmaInfo smaInfo;
};
......
......@@ -47,13 +47,14 @@ int32_t tsdbDataFileReadDelData(SDataFileReader *reader, const SDelBlk *delBlk,
// SDataFileWriter =============================================
typedef struct SDataFileWriter SDataFileWriter;
typedef struct SDataFileWriterConfig {
STsdb *tsdb;
STFile f[TSDB_FTYPE_MAX];
STsdb *tsdb;
STFile f[TSDB_FTYPE_MAX];
int32_t maxRow;
} SDataFileWriterConfig;
int32_t tsdbDataFileWriterOpen(const SDataFileWriterConfig *config, SDataFileWriter **writer);
int32_t tsdbDataFileWriterClose(SDataFileWriter **writer, bool abort, STFileOp op[/*TSDB_FTYPE_MAX*/]);
int32_t tsdbDataFileWriteTSData(SDataFileWriter *writer, SBlockData *bData);
int32_t tsdbDataFileWriteTSDataBlock(SDataFileWriter *writer, SBlockData *bData);
#ifdef __cplusplus
}
......
......@@ -76,16 +76,21 @@ int32_t tsdbDataFileReadDataBlk(SDataFileReader *reader, const SBlockIdx *blockI
return 0;
}
int32_t tsdbDataFileReadDataBlock(SDataFileReader *reader, const SDataBlk *dataBlk, SBlockData *bData) {
// TODO
return 0;
}
// SDataFileWriter =============================================
struct SDataFileWriter {
SDataFileWriterConfig config[1];
struct {
bool opened;
bool tbHasOldData;
SDataFileReader *reader;
const TBlockIdxArray *blockIdxArray;
int32_t blockIdxArrayIdx;
bool tbHasOldData;
TABLEID tbid[1];
const TDataBlkArray *dataBlkArray;
int32_t dataBlkArrayIdx;
......@@ -244,6 +249,29 @@ static int32_t tsdbDataFileWriteDataBlk(SDataFileWriter *writer, const TDataBlkA
code = TARRAY2_APPEND_PTR(writer->blockIdxArray, blockIdx);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
TSDB_ERROR_LOG(vid, lino, code);
}
return code;
}
static int32_t tsdbDataFileDoWriteTSRow(SDataFileWriter *writer, TSDBROW *row) {
int32_t code = 0;
int32_t lino = 0;
int32_t vid = TD_VID(writer->config->tsdb->pVnode);
if (row->type == TSDBROW_ROW_FMT) {
// TODO: udpate row schema
}
code = tBlockDataAppendRow(writer->ctx->bData, row, NULL /* TODO */, writer->ctx->tbid->uid);
TSDB_CHECK_CODE(code, lino, _exit);
if (writer->ctx->bData->nRow >= writer->config->maxRow) {
code = tsdbDataFileWriteBlockData(writer, writer->bData);
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
if (code) {
TSDB_ERROR_LOG(vid, lino, code);
......@@ -261,7 +289,7 @@ static int32_t tsdbDataFileWriteTableDataEnd(SDataFileWriter *writer) {
for (; writer->ctx->iRow < writer->ctx->bData->nRow; writer->ctx->iRow++) {
TSDBROW row[1] = {tsdbRowFromBlockData(writer->ctx->bData, writer->ctx->iRow)};
code = tBlockDataAppendRow(writer->bData, row, NULL, writer->ctx->tbid->uid);
code = tsdbDataFileDoWriteTSRow(writer, row);
TSDB_CHECK_CODE(code, lino, _exit);
}
......@@ -292,6 +320,8 @@ static int32_t tsdbDataFileWriteTableDataBegin(SDataFileWriter *writer, const TA
int32_t lino = 0;
int32_t vid = TD_VID(writer->config->tsdb->pVnode);
writer->ctx->tbHasOldData = false;
for (; writer->ctx->blockIdxArrayIdx < TARRAY2_SIZE(writer->ctx->blockIdxArray); writer->ctx->blockIdxArrayIdx++) {
const SBlockIdx *blockIdx = TARRAY2_GET_PTR(writer->ctx->blockIdxArray, writer->ctx->blockIdxArrayIdx);
......@@ -314,12 +344,14 @@ static int32_t tsdbDataFileWriteTableDataBegin(SDataFileWriter *writer, const TA
code = tsdbDataFileReadDataBlk(writer->ctx->reader, blockIdx, &writer->ctx->dataBlkArray);
TSDB_CHECK_CODE(code, lino, _exit);
writer->ctx->dataBlkArrayIdx = 0;
} else {
writer->ctx->tbHasOldData = false;
tBlockDataReset(writer->ctx->bData);
writer->ctx->iRow = 0;
writer->ctx->blockIdxArrayIdx++;
}
break;
}
}
writer->ctx->tbid[0] = tbid[0];
_exit:
......@@ -328,19 +360,77 @@ _exit:
}
return code;
}
static int32_t tsdbDataFileDoWriteTableData(SDataFileWriter *writer, SBlockData *bData) {
static int32_t tsdbDataFileDoWriteTableDataRow(SDataFileWriter *writer, TSDBROW *row) {
int32_t code = 0;
int32_t lino = 0;
int32_t vid = TD_VID(writer->config->tsdb->pVnode);
if (writer->ctx->tbHasOldData) {
if (writer->ctx->dataBlkArrayIdx >= TARRAY2_SIZE(writer->ctx->dataBlkArray)) {
// TODO
while (writer->ctx->tbHasOldData) {
for (; writer->ctx->iRow < writer->ctx->bData->nRow; writer->ctx->iRow++) {
TSDBROW row1[1] = {tsdbRowFromBlockData(writer->ctx->bData, writer->ctx->iRow)};
int32_t c = tsdbRowCmprFn(row, row1);
ASSERT(c);
if (row > 0) {
code = tsdbDataFileDoWriteTSRow(writer, row1);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
goto _write_row;
}
}
// TODO
for (; writer->ctx->dataBlkArrayIdx < TARRAY2_SIZE(writer->ctx->dataBlkArray); writer->ctx->dataBlkArrayIdx++) {
const SDataBlk *dataBlk = TARRAY2_GET_PTR(writer->ctx->dataBlkArray, writer->ctx->dataBlkArrayIdx);
SDataBlk dataBlk1[1] = {{.minKey = {}, .maxKey = {}}}; // TODO
int32_t c = tDataBlkCmprFn(dataBlk, dataBlk1);
if (c < 0) {
code = tsdbDataFileWriteBlockData(writer, writer->bData);
TSDB_CHECK_CODE(code, lino, _exit);
code = TARRAY2_APPEND_PTR(writer->dataBlkArray, dataBlk);
TSDB_CHECK_CODE(code, lino, _exit);
} else if (c > 0) {
goto _write_row;
} else {
code = tsdbDataFileReadDataBlock(writer->ctx->reader, dataBlk, writer->ctx->bData);
TSDB_CHECK_CODE(code, lino, _exit);
writer->ctx->iRow = 0;
writer->ctx->dataBlkArrayIdx++;
break;
}
}
if (writer->ctx->dataBlkArrayIdx >= TARRAY2_SIZE(writer->ctx->dataBlkArray) //
&& writer->ctx->iRow >= writer->ctx->bData->nRow) {
writer->ctx->tbHasOldData = false;
}
}
_write_row:
code = tsdbDataFileDoWriteTSRow(writer, row);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
TSDB_ERROR_LOG(vid, lino, code);
}
return code;
}
static int32_t tsdbDataFileDoWriteTableDataBlock(SDataFileWriter *writer, SBlockData *bData) {
int32_t code = 0;
int32_t lino = 0;
int32_t vid = TD_VID(writer->config->tsdb->pVnode);
if (!writer->ctx->tbHasOldData && writer->bData->nRow == 0) {
code = tsdbDataFileWriteBlockData(writer, bData);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
// code = tsdbDataFileWriteBlockData(writer, bData);
// TSDB_CHECK_CODE(code, lino, _exit);
for (int32_t i = 0; i < bData->nRow; i++) {
TSDBROW row[1] = {tsdbRowFromBlockData(bData, i)};
code = tsdbDataFileDoWriteTableDataRow(writer, row);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
_exit:
......@@ -349,7 +439,7 @@ _exit:
}
return code;
}
int32_t tsdbDataFileWriteTSData(SDataFileWriter *writer, SBlockData *bData) {
int32_t tsdbDataFileWriteTSDataBlock(SDataFileWriter *writer, SBlockData *bData) {
int32_t code = 0;
int32_t lino = 0;
int32_t vid = TD_VID(writer->config->tsdb->pVnode);
......@@ -369,7 +459,7 @@ int32_t tsdbDataFileWriteTSData(SDataFileWriter *writer, SBlockData *bData) {
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tsdbDataFileDoWriteTableData(writer, bData);
code = tsdbDataFileDoWriteTableDataBlock(writer, bData);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册