提交 416d4ac6 编写于 作者: H Hongze Cheng

more code

上级 ff9f56e0
...@@ -551,13 +551,12 @@ _err: ...@@ -551,13 +551,12 @@ _err:
return code; return code;
} }
static int32_t tsdbCommitDataBlock(SCommitter *pCommitter) { int32_t tsdbWriteDataBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SMapData *mDataBlk, int8_t cmprAlg) {
int32_t code = 0; int32_t code = 0;
SBlockData *pBlockData = &pCommitter->dWriter.bData;
SDataBlk dataBlk;
ASSERT(pBlockData->nRow > 0); if (pBlockData->nRow == 0) return code;
SDataBlk dataBlk;
tDataBlkReset(&dataBlk); tDataBlkReset(&dataBlk);
// info // info
...@@ -585,13 +584,12 @@ static int32_t tsdbCommitDataBlock(SCommitter *pCommitter) { ...@@ -585,13 +584,12 @@ static int32_t tsdbCommitDataBlock(SCommitter *pCommitter) {
// write // write
dataBlk.nSubBlock++; dataBlk.nSubBlock++;
code = tsdbWriteBlockData(pCommitter->dWriter.pWriter, pBlockData, &dataBlk.aSubBlock[dataBlk.nSubBlock - 1], code = tsdbWriteBlockData(pWriter, pBlockData, &dataBlk.aSubBlock[dataBlk.nSubBlock - 1],
((dataBlk.nSubBlock == 1) && !dataBlk.hasDup) ? &dataBlk.smaInfo : NULL, ((dataBlk.nSubBlock == 1) && !dataBlk.hasDup) ? &dataBlk.smaInfo : NULL, cmprAlg, 0);
pCommitter->cmprAlg, 0);
if (code) goto _err; if (code) goto _err;
// put SDataBlk // put SDataBlk
code = tMapDataPutItem(&pCommitter->dWriter.mBlock, &dataBlk, tPutDataBlk); code = tMapDataPutItem(mDataBlk, &dataBlk, tPutDataBlk);
if (code) goto _err; if (code) goto _err;
// clear // clear
...@@ -600,16 +598,15 @@ static int32_t tsdbCommitDataBlock(SCommitter *pCommitter) { ...@@ -600,16 +598,15 @@ static int32_t tsdbCommitDataBlock(SCommitter *pCommitter) {
return code; return code;
_err: _err:
tsdbError("vgId:%d tsdb commit data block failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d tsdb commit data block failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
return code; return code;
} }
static int32_t tsdbCommitLastBlock(SCommitter *pCommitter) { int32_t tsdbWriteSttBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SArray *aSttBlk, int8_t cmprAlg) {
int32_t code = 0; int32_t code = 0;
SSttBlk sstBlk; SSttBlk sstBlk;
SBlockData *pBlockData = &pCommitter->dWriter.bDatal;
ASSERT(pBlockData->nRow > 0); if (pBlockData->nRow == 0) return code;
// info // info
sstBlk.suid = pBlockData->suid; sstBlk.suid = pBlockData->suid;
...@@ -628,11 +625,11 @@ static int32_t tsdbCommitLastBlock(SCommitter *pCommitter) { ...@@ -628,11 +625,11 @@ static int32_t tsdbCommitLastBlock(SCommitter *pCommitter) {
sstBlk.maxUid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[pBlockData->nRow - 1]; sstBlk.maxUid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[pBlockData->nRow - 1];
// write // write
code = tsdbWriteBlockData(pCommitter->dWriter.pWriter, pBlockData, &sstBlk.bInfo, NULL, pCommitter->cmprAlg, 1); code = tsdbWriteBlockData(pWriter, pBlockData, &sstBlk.bInfo, NULL, cmprAlg, 1);
if (code) goto _err; if (code) goto _err;
// push SSttBlk // push SSttBlk
if (taosArrayPush(pCommitter->dWriter.aSttBlk, &sstBlk) == NULL) { if (taosArrayPush(aSttBlk, &sstBlk) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
...@@ -643,7 +640,7 @@ static int32_t tsdbCommitLastBlock(SCommitter *pCommitter) { ...@@ -643,7 +640,7 @@ static int32_t tsdbCommitLastBlock(SCommitter *pCommitter) {
return code; return code;
_err: _err:
tsdbError("vgId:%d tsdb commit last block failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d tsdb commit last block failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
return code; return code;
} }
...@@ -1120,15 +1117,14 @@ static int32_t tsdbCommitAheadBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) ...@@ -1120,15 +1117,14 @@ static int32_t tsdbCommitAheadBlock(SCommitter *pCommitter, SDataBlk *pDataBlk)
} }
if (pBlockData->nRow >= pCommitter->maxRow) { if (pBlockData->nRow >= pCommitter->maxRow) {
code = tsdbCommitDataBlock(pCommitter); code =
tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBlockData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
if (code) goto _err; if (code) goto _err;
} }
} }
if (pBlockData->nRow) { code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBlockData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
code = tsdbCommitDataBlock(pCommitter); if (code) goto _err;
if (code) goto _err;
}
return code; return code;
...@@ -1189,7 +1185,7 @@ static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) ...@@ -1189,7 +1185,7 @@ static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk)
} }
if (pBDataW->nRow >= pCommitter->maxRow) { if (pBDataW->nRow >= pCommitter->maxRow) {
code = tsdbCommitDataBlock(pCommitter); code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBDataW, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
if (code) goto _err; if (code) goto _err;
} }
} }
...@@ -1206,15 +1202,13 @@ static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) ...@@ -1206,15 +1202,13 @@ static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk)
} }
if (pBDataW->nRow >= pCommitter->maxRow) { if (pBDataW->nRow >= pCommitter->maxRow) {
code = tsdbCommitDataBlock(pCommitter); code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBDataW, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
if (code) goto _err; if (code) goto _err;
} }
} }
if (pBDataW->nRow) { code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBDataW, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
code = tsdbCommitDataBlock(pCommitter); if (code) goto _err;
if (code) goto _err;
}
return code; return code;
...@@ -1302,10 +1296,8 @@ static int32_t tsdbInitLastBlockIfNeed(SCommitter *pCommitter, TABLEID id) { ...@@ -1302,10 +1296,8 @@ static int32_t tsdbInitLastBlockIfNeed(SCommitter *pCommitter, TABLEID id) {
SBlockData *pBDatal = &pCommitter->dWriter.bDatal; SBlockData *pBDatal = &pCommitter->dWriter.bDatal;
if (pBDatal->suid || pBDatal->uid) { if (pBDatal->suid || pBDatal->uid) {
if ((pBDatal->suid != id.suid) || (id.suid == 0)) { if ((pBDatal->suid != id.suid) || (id.suid == 0)) {
if (pBDatal->nRow) { code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBDatal, pCommitter->dWriter.aSttBlk, pCommitter->cmprAlg);
code = tsdbCommitLastBlock(pCommitter); if (code) goto _exit;
if (code) goto _exit;
}
tBlockDataReset(pBDatal); tBlockDataReset(pBDatal);
} }
} }
...@@ -1337,7 +1329,7 @@ static int32_t tsdbAppendLastBlock(SCommitter *pCommitter) { ...@@ -1337,7 +1329,7 @@ static int32_t tsdbAppendLastBlock(SCommitter *pCommitter) {
if (code) goto _err; if (code) goto _err;
if (pBDatal->nRow >= pCommitter->maxRow) { if (pBDatal->nRow >= pCommitter->maxRow) {
code = tsdbCommitLastBlock(pCommitter); code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBDatal, pCommitter->dWriter.aSttBlk, pCommitter->cmprAlg);
if (code) goto _err; if (code) goto _err;
} }
} }
...@@ -1389,10 +1381,11 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) { ...@@ -1389,10 +1381,11 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) {
if (pBData->nRow >= pCommitter->maxRow) { if (pBData->nRow >= pCommitter->maxRow) {
if (pCommitter->toLastOnly) { if (pCommitter->toLastOnly) {
code = tsdbCommitLastBlock(pCommitter); code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBData, pCommitter->dWriter.aSttBlk, pCommitter->cmprAlg);
if (code) goto _err; if (code) goto _err;
} else { } else {
code = tsdbCommitDataBlock(pCommitter); code =
tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
if (code) goto _err; if (code) goto _err;
} }
} }
...@@ -1400,7 +1393,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) { ...@@ -1400,7 +1393,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) {
if (!pCommitter->toLastOnly && pBData->nRow) { if (!pCommitter->toLastOnly && pBData->nRow) {
if (pBData->nRow > pCommitter->minRow) { if (pBData->nRow > pCommitter->minRow) {
code = tsdbCommitDataBlock(pCommitter); code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
if (code) goto _err; if (code) goto _err;
} else { } else {
code = tsdbAppendLastBlock(pCommitter); code = tsdbAppendLastBlock(pCommitter);
...@@ -1466,10 +1459,9 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { ...@@ -1466,10 +1459,9 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
code = tsdbMoveCommitData(pCommitter, id); code = tsdbMoveCommitData(pCommitter, id);
if (code) goto _err; if (code) goto _err;
if (pCommitter->dWriter.bDatal.nRow > 0) { code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, pCommitter->dWriter.aSttBlk,
code = tsdbCommitLastBlock(pCommitter); pCommitter->cmprAlg);
if (code) goto _err; if (code) goto _err;
}
return code; return code;
......
...@@ -1074,6 +1074,9 @@ _err: ...@@ -1074,6 +1074,9 @@ _err:
} }
#endif #endif
extern int32_t tsdbWriteDataBlock(SDataFWriter* pWriter, SBlockData* pBlockData, SMapData* mDataBlk, int8_t cmprAlg);
extern int32_t tsdbWriteSttBlock(SDataFWriter* pWriter, SBlockData* pBlockData, SArray* aSttBlk, int8_t cmprAlg);
static int32_t tsdbSnapNextTableData(STsdbSnapWriter* pWriter) { static int32_t tsdbSnapNextTableData(STsdbSnapWriter* pWriter) {
int32_t code = 0; int32_t code = 0;
...@@ -1179,19 +1182,67 @@ static int32_t tsdbSnapWriteCloseFile(STsdbSnapWriter* pWriter) { ...@@ -1179,19 +1182,67 @@ static int32_t tsdbSnapWriteCloseFile(STsdbSnapWriter* pWriter) {
ASSERT(pWriter->dWriter.pWriter); ASSERT(pWriter->dWriter.pWriter);
// (todo) // todo: end current commit table
for (; pWriter->dReader.iRow < pWriter->dReader.bData.nRow; pWriter->dReader.iRow++) {
TSDBROW row = tsdbRowFromBlockData(&pWriter->dReader.bData, pWriter->dReader.iRow);
code = tBlockDataAppendRow(&pWriter->dWriter.bData, &row, NULL, pWriter->dReader.pBlockIdx->uid);
if (code) goto _err;
}
for (; pWriter->dReader.iDataBlk < pWriter->dReader.mDataBlk.nItem; pWriter->dReader.iDataBlk++) {
SDataBlk dataBlk;
tMapDataGetItemByIdx(&pWriter->dReader.mDataBlk, pWriter->dReader.iDataBlk, &dataBlk, tGetDataBlk);
code = tMapDataPutItem(&pWriter->dWriter.mDataBlk, &dataBlk, tPutDataBlk);
if (code) goto _err;
}
if (pWriter->dWriter.mDataBlk.nItem) {
SBlockIdx blockIdx = *pWriter->dReader.pBlockIdx;
code = tsdbWriteDataBlk(pWriter->dWriter.pWriter, &pWriter->dWriter.mDataBlk, &blockIdx);
if (code) goto _err;
if (taosArrayPush(pWriter->dWriter.aBlockIdx, &blockIdx) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
}
// copy remain table data // copy remain table data
TABLEID id = {.suid = INT64_MAX, .uid = INT64_MAX}; TABLEID id = {.suid = INT64_MAX, .uid = INT64_MAX};
code = tsdbSnapWriteCopyData(pWriter, &id); code = tsdbSnapWriteCopyData(pWriter, &id);
if (code) goto _exit; if (code) goto _err;
if (pWriter->dWriter.sData.nRow > 0) { code =
// TODO: write the last block tsdbWriteSttBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.sData, pWriter->dWriter.aSttBlk, pWriter->cmprAlg);
if (code) goto _err;
// Indices
code = tsdbWriteBlockIdx(pWriter->dWriter.pWriter, pWriter->dWriter.aBlockIdx);
if (code) goto _err;
code = tsdbWriteSttBlk(pWriter->dWriter.pWriter, pWriter->dWriter.aSttBlk);
if (code) goto _err;
code = tsdbUpdateDFileSetHeader(pWriter->dWriter.pWriter);
if (code) goto _err;
code = tsdbFSUpsertFSet(&pWriter->fs, &pWriter->dWriter.pWriter->wSet);
if (code) goto _err;
code = tsdbDataFWriterClose(&pWriter->dWriter.pWriter, 1);
if (code) goto _err;
if (pWriter->dReader.pReader) {
code = tsdbDataFReaderClose(&pWriter->dReader.pReader);
if (code) goto _err;
} }
_exit: _exit:
return code; return code;
_err:
return code;
} }
static int32_t tsdbSnapWriteRowData(STsdbSnapWriter* pWriter, int32_t iRow) { static int32_t tsdbSnapWriteRowData(STsdbSnapWriter* pWriter, int32_t iRow) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册