提交 bdeb1ebc 编写于 作者: H Hongze Cheng

compare

上级 ce087d3d
...@@ -17,6 +17,8 @@ ...@@ -17,6 +17,8 @@
typedef enum { MEMORY_DATA_ITER = 0, STT_DATA_ITER } EDataIterT; typedef enum { MEMORY_DATA_ITER = 0, STT_DATA_ITER } EDataIterT;
#define USE_STREAM_COMPRESSION 1
typedef struct { typedef struct {
SRBTreeNode n; SRBTreeNode n;
SRowInfo r; SRowInfo r;
...@@ -70,12 +72,16 @@ typedef struct { ...@@ -70,12 +72,16 @@ typedef struct {
int8_t toLastOnly; int8_t toLastOnly;
}; };
struct { struct {
SDataFWriter *pWriter; SDataFWriter *pWriter;
SArray *aBlockIdx; // SArray<SBlockIdx> SArray *aBlockIdx; // SArray<SBlockIdx>
SArray *aSttBlk; // SArray<SSttBlk> SArray *aSttBlk; // SArray<SSttBlk>
SMapData mBlock; // SMapData<SDataBlk> SMapData mBlock; // SMapData<SDataBlk>
SBlockData bData; SBlockData bData;
#if USE_STREAM_COMPRESSION
SDiskDataBuilder *pBuilder; SDiskDataBuilder *pBuilder;
#else
SBlockData bDatal;
#endif
} dWriter; } dWriter;
SSkmInfo skmTable; SSkmInfo skmTable;
SSkmInfo skmRow; SSkmInfo skmRow;
...@@ -542,7 +548,11 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { ...@@ -542,7 +548,11 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
taosArrayClear(pCommitter->dWriter.aSttBlk); taosArrayClear(pCommitter->dWriter.aSttBlk);
tMapDataReset(&pCommitter->dWriter.mBlock); tMapDataReset(&pCommitter->dWriter.mBlock);
tBlockDataReset(&pCommitter->dWriter.bData); tBlockDataReset(&pCommitter->dWriter.bData);
#if USE_STREAM_COMPRESSION
tDiskDataBuilderClear(pCommitter->dWriter.pBuilder); tDiskDataBuilderClear(pCommitter->dWriter.pBuilder);
#else
tBlockDataReset(&pCommitter->dWriter.bDatal);
#endif
// open iter // open iter
code = tsdbOpenCommitIter(pCommitter); code = tsdbOpenCommitIter(pCommitter);
...@@ -860,7 +870,11 @@ static int32_t tsdbCommitDataStart(SCommitter *pCommitter) { ...@@ -860,7 +870,11 @@ static int32_t tsdbCommitDataStart(SCommitter *pCommitter) {
code = tBlockDataCreate(&pCommitter->dWriter.bData); code = tBlockDataCreate(&pCommitter->dWriter.bData);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
#if USE_STREAM_COMPRESSION
code = tDiskDataBuilderCreate(&pCommitter->dWriter.pBuilder); code = tDiskDataBuilderCreate(&pCommitter->dWriter.pBuilder);
#else
code = tBlockDataCreate(&pCommitter->dWriter.bDatal);
#endif
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
_exit: _exit:
...@@ -889,7 +903,11 @@ static void tsdbCommitDataEnd(SCommitter *pCommitter) { ...@@ -889,7 +903,11 @@ static void tsdbCommitDataEnd(SCommitter *pCommitter) {
taosArrayDestroy(pCommitter->dWriter.aSttBlk); taosArrayDestroy(pCommitter->dWriter.aSttBlk);
tMapDataClear(&pCommitter->dWriter.mBlock); tMapDataClear(&pCommitter->dWriter.mBlock);
tBlockDataDestroy(&pCommitter->dWriter.bData, 1); tBlockDataDestroy(&pCommitter->dWriter.bData, 1);
#if USE_STREAM_COMPRESSION
tDiskDataBuilderDestroy(pCommitter->dWriter.pBuilder); tDiskDataBuilderDestroy(pCommitter->dWriter.pBuilder);
#else
tBlockDataDestroy(&pCommitter->dWriter.bDatal, 1);
#endif
tTSchemaDestroy(pCommitter->skmTable.pTSchema); tTSchemaDestroy(pCommitter->skmTable.pTSchema);
tTSchemaDestroy(pCommitter->skmRow.pTSchema); tTSchemaDestroy(pCommitter->skmRow.pTSchema);
} }
...@@ -1360,6 +1378,7 @@ static int32_t tsdbInitSttBlockBuilderIfNeed(SCommitter *pCommitter, TABLEID id) ...@@ -1360,6 +1378,7 @@ static int32_t tsdbInitSttBlockBuilderIfNeed(SCommitter *pCommitter, TABLEID id)
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
#if USE_STREAM_COMPRESSION
SDiskDataBuilder *pBuilder = pCommitter->dWriter.pBuilder; SDiskDataBuilder *pBuilder = pCommitter->dWriter.pBuilder;
if (pBuilder->suid || pBuilder->uid) { if (pBuilder->suid || pBuilder->uid) {
if (!TABLE_SAME_SCHEMA(pBuilder->suid, pBuilder->uid, id.suid, id.uid)) { if (!TABLE_SAME_SCHEMA(pBuilder->suid, pBuilder->uid, id.suid, id.uid)) {
...@@ -1376,6 +1395,25 @@ static int32_t tsdbInitSttBlockBuilderIfNeed(SCommitter *pCommitter, TABLEID id) ...@@ -1376,6 +1395,25 @@ static int32_t tsdbInitSttBlockBuilderIfNeed(SCommitter *pCommitter, TABLEID id)
code = tDiskDataBuilderInit(pBuilder, pCommitter->skmTable.pTSchema, &id, pCommitter->cmprAlg, 0); code = tDiskDataBuilderInit(pBuilder, pCommitter->skmTable.pTSchema, &id, pCommitter->cmprAlg, 0);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
#else
SBlockData *pBData = &pCommitter->dWriter.bDatal;
if (pBData->suid || pBData->uid) {
if (!TABLE_SAME_SCHEMA(pBData->suid, pBData->uid, id.suid, id.uid)) {
code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBData, pCommitter->dWriter.aSttBlk, pCommitter->cmprAlg);
TSDB_CHECK_CODE(code, lino, _exit);
tBlockDataReset(pBData);
}
}
if (!pBData->suid && !pBData->uid) {
ASSERT(pCommitter->skmTable.suid == id.suid);
ASSERT(pCommitter->skmTable.uid == id.uid);
TABLEID tid = {.suid = id.suid, .uid = id.suid ? 0 : id.uid};
code = tBlockDataInit(pBData, &tid, pCommitter->skmTable.pTSchema, NULL, 0);
TSDB_CHECK_CODE(code, lino, _exit);
}
#endif
_exit: _exit:
if (code) { if (code) {
...@@ -1398,6 +1436,7 @@ static int32_t tsdbAppendLastBlock(SCommitter *pCommitter) { ...@@ -1398,6 +1436,7 @@ static int32_t tsdbAppendLastBlock(SCommitter *pCommitter) {
for (int32_t iRow = 0; iRow < pBData->nRow; iRow++) { for (int32_t iRow = 0; iRow < pBData->nRow; iRow++) {
TSDBROW row = tsdbRowFromBlockData(pBData, iRow); TSDBROW row = tsdbRowFromBlockData(pBData, iRow);
#if USE_STREAM_COMPRESSION
code = tDiskDataAddRow(pCommitter->dWriter.pBuilder, &row, NULL, &id); code = tDiskDataAddRow(pCommitter->dWriter.pBuilder, &row, NULL, &id);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
...@@ -1408,6 +1447,16 @@ static int32_t tsdbAppendLastBlock(SCommitter *pCommitter) { ...@@ -1408,6 +1447,16 @@ static int32_t tsdbAppendLastBlock(SCommitter *pCommitter) {
code = tsdbInitSttBlockBuilderIfNeed(pCommitter, id); code = tsdbInitSttBlockBuilderIfNeed(pCommitter, id);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
#else
code = tBlockDataAppendRow(&pCommitter->dWriter.bDatal, &row, NULL, id.uid);
TSDB_CHECK_CODE(code, lino, _exit);
if (pCommitter->dWriter.bDatal.nRow >= pCommitter->maxRow) {
code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, pCommitter->dWriter.aSttBlk,
pCommitter->cmprAlg);
TSDB_CHECK_CODE(code, lino, _exit);
}
#endif
} }
_exit: _exit:
...@@ -1441,7 +1490,11 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) { ...@@ -1441,7 +1490,11 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) {
pTSchema = pCommitter->skmRow.pTSchema; pTSchema = pCommitter->skmRow.pTSchema;
} }
#if USE_STREAM_COMPRESSION
code = tDiskDataAddRow(pCommitter->dWriter.pBuilder, &pRowInfo->row, pTSchema, &id); code = tDiskDataAddRow(pCommitter->dWriter.pBuilder, &pRowInfo->row, pTSchema, &id);
#else
code = tBlockDataAppendRow(&pCommitter->dWriter.bDatal, &pRowInfo->row, pTSchema, id.uid);
#endif
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbNextCommitRow(pCommitter); code = tsdbNextCommitRow(pCommitter);
...@@ -1452,6 +1505,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) { ...@@ -1452,6 +1505,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) {
pRowInfo = NULL; pRowInfo = NULL;
} }
#if USE_STREAM_COMPRESSION
if (pCommitter->dWriter.pBuilder->nRow >= pCommitter->maxRow) { if (pCommitter->dWriter.pBuilder->nRow >= pCommitter->maxRow) {
code = tsdbCommitSttBlk(pCommitter->dWriter.pWriter, pCommitter->dWriter.pBuilder, pCommitter->dWriter.aSttBlk); code = tsdbCommitSttBlk(pCommitter->dWriter.pWriter, pCommitter->dWriter.pBuilder, pCommitter->dWriter.aSttBlk);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
...@@ -1459,6 +1513,13 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) { ...@@ -1459,6 +1513,13 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) {
code = tsdbInitSttBlockBuilderIfNeed(pCommitter, id); code = tsdbInitSttBlockBuilderIfNeed(pCommitter, id);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
#else
if (pCommitter->dWriter.bData.nRow >= pCommitter->maxRow) {
code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, pCommitter->dWriter.aSttBlk,
pCommitter->cmprAlg);
TSDB_CHECK_CODE(code, lino, _exit);
}
#endif
} }
} else { } else {
SBlockData *pBData = &pCommitter->dWriter.bData; SBlockData *pBData = &pCommitter->dWriter.bData;
...@@ -1561,7 +1622,12 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { ...@@ -1561,7 +1622,12 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
code = tsdbMoveCommitData(pCommitter, id); code = tsdbMoveCommitData(pCommitter, id);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
#if USE_STREAM_COMPRESSION
code = tsdbCommitSttBlk(pCommitter->dWriter.pWriter, pCommitter->dWriter.pBuilder, pCommitter->dWriter.aSttBlk); code = tsdbCommitSttBlk(pCommitter->dWriter.pWriter, pCommitter->dWriter.pBuilder, pCommitter->dWriter.aSttBlk);
#else
code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, pCommitter->dWriter.aSttBlk,
pCommitter->cmprAlg);
#endif
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
_exit: _exit:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册