提交 1b4cfa64 编写于 作者: M Minglei Jin

Merge branch 'feat/tsdb_refact' of https://github.com/taosdata/TDengine into feat/tsdb_refact

...@@ -324,7 +324,6 @@ static int32_t tsdbCommitterUpdateSchema(SCommitter *pCommitter, int64_t suid, i ...@@ -324,7 +324,6 @@ static int32_t tsdbCommitterUpdateSchema(SCommitter *pCommitter, int64_t suid, i
} }
} }
_update_schema:
pCommitter->suid = suid; pCommitter->suid = suid;
pCommitter->uid = uid; pCommitter->uid = uid;
tTSchemaDestroy(pCommitter->pTSchema); tTSchemaDestroy(pCommitter->pTSchema);
...@@ -341,15 +340,22 @@ static int32_t tsdbCommitBlockData(SCommitter *pCommitter, SBlockData *pBlockDat ...@@ -341,15 +340,22 @@ static int32_t tsdbCommitBlockData(SCommitter *pCommitter, SBlockData *pBlockDat
int8_t toDataOnly) { int8_t toDataOnly) {
int32_t code = 0; int32_t code = 0;
if (pBlock->nSubBlock == 0) {
if (!toDataOnly && pBlockData->nRow < pCommitter->minRow) { if (!toDataOnly && pBlockData->nRow < pCommitter->minRow) {
pBlock->last = 1; pBlock->last = 1;
} else { } else {
pBlock->last = 0; pBlock->last = 0;
} }
}
code = tsdbWriteBlockData(pCommitter->pWriter, pBlockData, NULL, NULL, pBlockIdx, pBlock, pCommitter->cmprAlg); code = tsdbWriteBlockData(pCommitter->pWriter, pBlockData, NULL, NULL, pBlockIdx, pBlock, pCommitter->cmprAlg);
if (code) goto _err; if (code) goto _err;
#if 0
code = tsdbWriteBlockSMA(pCommitter, pBlockData, pBlock);
if (code) goto _err;
#endif
code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock); code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock);
if (code) goto _err; if (code) goto _err;
...@@ -446,16 +452,7 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB ...@@ -446,16 +452,7 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB
} }
_write_block: _write_block:
if (!toDataOnly && pBlockData->nRow < pCommitter->minRow) { code = tsdbCommitBlockData(pCommitter, pBlockData, pBlock, pBlockIdx, toDataOnly);
pBlock->last = 1;
} else {
pBlock->last = 0;
}
code = tsdbWriteBlockData(pCommitter->pWriter, pBlockData, NULL, NULL, pBlockIdx, pBlock, pCommitter->cmprAlg);
if (code) goto _err;
code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock);
if (code) goto _err; if (code) goto _err;
tBlockReset(pBlock); tBlockReset(pBlock);
...@@ -491,7 +488,7 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter ...@@ -491,7 +488,7 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter
} }
// update schema // update schema
code = tsdbCommitterUpdateSchema(pCommitter, pIter->pTbData->suid, pIter->pTbData->uid, TSDBROW_SVERSION(pRow)); code = tsdbCommitterUpdateSchema(pCommitter, suid, uid, TSDBROW_SVERSION(pRow));
if (code) goto _err; if (code) goto _err;
// append // append
...@@ -506,17 +503,7 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter ...@@ -506,17 +503,7 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter
continue; continue;
_write_block: _write_block:
if (!toDataOnly && pBlockData->nRow < pCommitter->minRow) { code = tsdbCommitBlockData(pCommitter, pBlockData, pBlock, &(SBlockIdx){.suid = suid, .uid = uid}, toDataOnly);
pBlock->last = 1;
} else {
pBlock->last = 0;
}
code = tsdbWriteBlockData(pCommitter->pWriter, pBlockData, NULL, NULL, &(SBlockIdx){.suid = suid, .uid = uid},
pBlock, pCommitter->cmprAlg);
if (code) goto _err;
code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock);
if (code) goto _err; if (code) goto _err;
tBlockReset(pBlock); tBlockReset(pBlock);
...@@ -538,12 +525,7 @@ static int32_t tsdbCommitTableDiskData(SCommitter *pCommitter, SBlock *pBlock, S ...@@ -538,12 +525,7 @@ static int32_t tsdbCommitTableDiskData(SCommitter *pCommitter, SBlock *pBlock, S
if (code) goto _err; if (code) goto _err;
tBlockReset(&pCommitter->nBlock); tBlockReset(&pCommitter->nBlock);
pCommitter->nBlock.last = pBlock->last; code = tsdbCommitBlockData(pCommitter, &pCommitter->oBlockData, &pCommitter->nBlock, pBlockIdx, 0);
code = tsdbWriteBlockData(pCommitter->pWriter, &pCommitter->oBlockData, NULL, NULL, pBlockIdx, &pCommitter->nBlock,
pCommitter->cmprAlg);
if (code) goto _err;
code = tMapDataPutItem(&pCommitter->nBlockMap, &pCommitter->nBlock, tPutBlock);
if (code) goto _err; if (code) goto _err;
} else { } else {
code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock); code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock);
...@@ -639,11 +621,7 @@ static int32_t tsdbMergeAsSubBlock(SCommitter *pCommitter, STbDataIter *pIter, S ...@@ -639,11 +621,7 @@ static int32_t tsdbMergeAsSubBlock(SCommitter *pCommitter, STbDataIter *pIter, S
code = tBlockCopy(pBlock, &pCommitter->nBlock); code = tBlockCopy(pBlock, &pCommitter->nBlock);
if (code) goto _err; if (code) goto _err;
code = tsdbWriteBlockData(pCommitter->pWriter, pBlockData, NULL, NULL, pBlockIdx, &pCommitter->nBlock, code = tsdbCommitBlockData(pCommitter, pBlockData, &pCommitter->nBlock, pBlockIdx, 0);
pCommitter->cmprAlg);
if (code) goto _err;
code = tMapDataPutItem(&pCommitter->nBlockMap, &pCommitter->nBlock, tPutBlock);
if (code) goto _err; if (code) goto _err;
return code; return code;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册