提交 43dec55f 编写于 作者: H Hongze Cheng

more work

上级 04804e38
...@@ -244,15 +244,16 @@ int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx); ...@@ -244,15 +244,16 @@ int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx);
int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, SBlockIdx *pBlockIdx); int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, SBlockIdx *pBlockIdx);
int32_t tsdbWriteBlockL(SDataFWriter *pWriter, SArray *aBlockL); int32_t tsdbWriteBlockL(SDataFWriter *pWriter, SArray *aBlockL);
int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo, int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo,
int8_t cmprAlg, int8_t toLast, uint8_t **ppBuf); int8_t cmprAlg, int8_t toLast);
int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo); int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo);
// SDataFReader // SDataFReader
int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet); int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet);
int32_t tsdbDataFReaderClose(SDataFReader **ppReader); int32_t tsdbDataFReaderClose(SDataFReader **ppReader);
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx); int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx);
int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL);
int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMapData); int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMapData);
int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL);
int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int16_t *aColId, int32_t nCol, int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int16_t *aColId, int32_t nCol,
SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2); SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2);
int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, SBlockData *pBlockData, int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, SBlockData *pBlockData,
......
...@@ -760,7 +760,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs ...@@ -760,7 +760,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
if (pDelFile) { if (pDelFile) {
SDelFReader *pDelFReader; SDelFReader *pDelFReader;
code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb, NULL); code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb);
if (code) goto _err; if (code) goto _err;
code = getTableDelIdx(pDelFReader, suid, uid, &delIdx); code = getTableDelIdx(pDelFReader, suid, uid, &delIdx);
......
...@@ -531,8 +531,7 @@ static int32_t tsdbCommitDataBlock(SCommitter *pCommitter, SBlock *pBlock) { ...@@ -531,8 +531,7 @@ static int32_t tsdbCommitDataBlock(SCommitter *pCommitter, SBlock *pBlock) {
// write // write
block.nSubBlock++; block.nSubBlock++;
code = tsdbWriteBlockData(pCommitter->dWriter.pWriter, pBlockData, &block.aSubBlock[block.nSubBlock - 1], code = tsdbWriteBlockData(pCommitter->dWriter.pWriter, pBlockData, &block.aSubBlock[block.nSubBlock - 1],
((block.nSubBlock == 1) && !block.hasDup) ? &block.smaInfo : NULL, pCommitter->cmprAlg, 0, ((block.nSubBlock == 1) && !block.hasDup) ? &block.smaInfo : NULL, pCommitter->cmprAlg, 0);
NULL);
if (code) goto _err; if (code) goto _err;
// put SBlock // put SBlock
...@@ -569,7 +568,7 @@ static int32_t tsdbCommitLastBlock(SCommitter *pCommitter) { ...@@ -569,7 +568,7 @@ static int32_t tsdbCommitLastBlock(SCommitter *pCommitter) {
blockL.maxUid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[pBlockData->nRow - 1]; blockL.maxUid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[pBlockData->nRow - 1];
// write // write
code = tsdbWriteBlockData(pCommitter->dWriter.pWriter, pBlockData, &blockL.bInfo, NULL, pCommitter->cmprAlg, 1, NULL); code = tsdbWriteBlockData(pCommitter->dWriter.pWriter, pBlockData, &blockL.bInfo, NULL, pCommitter->cmprAlg, 1);
if (code) goto _err; if (code) goto _err;
// push SBlockL // push SBlockL
......
...@@ -679,6 +679,7 @@ _err: ...@@ -679,6 +679,7 @@ _err:
static int32_t tsdbReadBlockDataKey(SBlockData *pBlockData, SBlockInfo *pSubBlock, uint8_t *pBuf, uint8_t **ppBuf) { static int32_t tsdbReadBlockDataKey(SBlockData *pBlockData, SBlockInfo *pSubBlock, uint8_t *pBuf, uint8_t **ppBuf) {
int32_t code = 0; int32_t code = 0;
#if 0
int64_t size = pSubBlock->szVersion + pSubBlock->szTSKEY + sizeof(TSCKSUM); int64_t size = pSubBlock->szVersion + pSubBlock->szTSKEY + sizeof(TSCKSUM);
int64_t n; int64_t n;
...@@ -729,12 +730,14 @@ static int32_t tsdbReadBlockDataKey(SBlockData *pBlockData, SBlockInfo *pSubBloc ...@@ -729,12 +730,14 @@ static int32_t tsdbReadBlockDataKey(SBlockData *pBlockData, SBlockInfo *pSubBloc
return code; return code;
_err: _err:
#endif
return code; return code;
} }
static int32_t tsdbReadColDataImpl(SBlockInfo *pSubBlock, SBlockCol *pBlockCol, SColData *pColData, uint8_t *pBuf, static int32_t tsdbReadColDataImpl(SBlockInfo *pSubBlock, SBlockCol *pBlockCol, SColData *pColData, uint8_t *pBuf,
uint8_t **ppBuf) { uint8_t **ppBuf) {
int32_t code = 0; int32_t code = 0;
#if 0
int64_t size; int64_t size;
int64_t n; int64_t n;
...@@ -822,6 +825,7 @@ static int32_t tsdbReadColDataImpl(SBlockInfo *pSubBlock, SBlockCol *pBlockCol, ...@@ -822,6 +825,7 @@ static int32_t tsdbReadColDataImpl(SBlockInfo *pSubBlock, SBlockCol *pBlockCol,
return code; return code;
_err: _err:
#endif
return code; return code;
} }
...@@ -900,10 +904,11 @@ _exit: ...@@ -900,10 +904,11 @@ _exit:
static int32_t tsdbReadSubColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int32_t iSubBlock, static int32_t tsdbReadSubColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int32_t iSubBlock,
int16_t *aColId, int32_t nCol, SBlockData *pBlockData, uint8_t **ppBuf1, int16_t *aColId, int32_t nCol, SBlockData *pBlockData, uint8_t **ppBuf1,
uint8_t **ppBuf2) { uint8_t **ppBuf2) {
int32_t code = 0;
#if 0
TdFilePtr pFD = pReader->pDataFD; TdFilePtr pFD = pReader->pDataFD;
SBlockInfo *pSubBlock = &pBlock->aSubBlock[iSubBlock]; SBlockInfo *pSubBlock = &pBlock->aSubBlock[iSubBlock];
SArray *aBlockCol = NULL; SArray *aBlockCol = NULL;
int32_t code = 0;
int64_t offset; int64_t offset;
int64_t size; int64_t size;
int64_t n; int64_t n;
...@@ -1012,6 +1017,7 @@ _exit: ...@@ -1012,6 +1017,7 @@ _exit:
_err: _err:
taosArrayDestroy(aBlockCol); taosArrayDestroy(aBlockCol);
#endif
return code; return code;
} }
...@@ -1071,7 +1077,8 @@ _err: ...@@ -1071,7 +1077,8 @@ _err:
static int32_t tsdbReadSubBlockData(SDataFReader *pReader, SBlock *pBlock, int32_t iSubBlock, SBlockData *pBlockData, static int32_t tsdbReadSubBlockData(SDataFReader *pReader, SBlock *pBlock, int32_t iSubBlock, SBlockData *pBlockData,
uint8_t **ppBuf1, uint8_t **ppBuf2) { uint8_t **ppBuf1, uint8_t **ppBuf2) {
int32_t code = 0; int32_t code = 0;
#if 0
uint8_t *p; uint8_t *p;
int64_t size; int64_t size;
int64_t n; int64_t n;
...@@ -1148,6 +1155,7 @@ static int32_t tsdbReadSubBlockData(SDataFReader *pReader, SBlock *pBlock, int32 ...@@ -1148,6 +1155,7 @@ static int32_t tsdbReadSubBlockData(SDataFReader *pReader, SBlock *pBlock, int32
_err: _err:
tsdbError("vgId:%d, tsdb read sub block data failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d, tsdb read sub block data failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
taosArrayDestroy(aBlockCol); taosArrayDestroy(aBlockCol);
#endif
return code; return code;
} }
...@@ -1218,6 +1226,7 @@ _err: ...@@ -1218,6 +1226,7 @@ _err:
int32_t tsdbReadLastBlock(SDataFReader *pReader, SBlockL *pBlockL, SBlockData *pBlockData, uint8_t **ppBuf1, int32_t tsdbReadLastBlock(SDataFReader *pReader, SBlockL *pBlockL, SBlockData *pBlockData, uint8_t **ppBuf1,
uint8_t **ppBuf2) { uint8_t **ppBuf2) {
int32_t code = 0; int32_t code = 0;
#if 0
tBlockDataReset(pBlockData); tBlockDataReset(pBlockData);
...@@ -1336,11 +1345,13 @@ int32_t tsdbReadLastBlock(SDataFReader *pReader, SBlockL *pBlockL, SBlockData *p ...@@ -1336,11 +1345,13 @@ int32_t tsdbReadLastBlock(SDataFReader *pReader, SBlockL *pBlockL, SBlockData *p
_err: _err:
tsdbError("vgId:%d tsdb read last block failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d tsdb read last block failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
#endif
return code; return code;
} }
int32_t tsdbReadBlockSma(SDataFReader *pReader, SBlock *pBlock, SArray *aColumnDataAgg, uint8_t **ppBuf) { int32_t tsdbReadBlockSma(SDataFReader *pReader, SBlock *pBlock, SArray *aColumnDataAgg, uint8_t **ppBuf) {
int32_t code = 0; int32_t code = 0;
#if 0
TdFilePtr pFD = pReader->pSmaFD; TdFilePtr pFD = pReader->pSmaFD;
int64_t offset = pBlock->aSubBlock[0].sOffset; int64_t offset = pBlock->aSubBlock[0].sOffset;
int64_t size = pBlock->aSubBlock[0].nSma * sizeof(SColumnDataAgg) + sizeof(TSCKSUM); int64_t size = pBlock->aSubBlock[0].nSma * sizeof(SColumnDataAgg) + sizeof(TSCKSUM);
...@@ -1391,6 +1402,7 @@ int32_t tsdbReadBlockSma(SDataFReader *pReader, SBlock *pBlock, SArray *aColumnD ...@@ -1391,6 +1402,7 @@ int32_t tsdbReadBlockSma(SDataFReader *pReader, SBlock *pBlock, SArray *aColumnD
_err: _err:
tsdbError("vgId:%d, read block sma failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d, read block sma failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
tFree(pBuf); tFree(pBuf);
#endif
return code; return code;
} }
...@@ -1899,14 +1911,12 @@ _err: ...@@ -1899,14 +1911,12 @@ _err:
} }
int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo, int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo,
int8_t cmprAlg, int8_t toLast, uint8_t **ppBuf) { int8_t cmprAlg, int8_t toLast) {
int32_t code = 0; int32_t code = 0;
TdFilePtr pFD = toLast ? pWriter->pLastFD : pWriter->pDataFD; TdFilePtr pFD = toLast ? pWriter->pLastFD : pWriter->pDataFD;
SDiskData *pDiskData = &pWriter->dData; SDiskData *pDiskData = &pWriter->dData;
uint8_t *pBuf = NULL;
if (!ppBuf) ppBuf = &pBuf;
#if 0
// convert // convert
code = tBlockToDiskData(pBlockData, pDiskData, cmprAlg); code = tBlockToDiskData(pBlockData, pDiskData, cmprAlg);
if (code) goto _err; if (code) goto _err;
...@@ -1971,6 +1981,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock ...@@ -1971,6 +1981,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock
for (int32_t iBlockCol = 0; iBlockCol < taosArrayGetSize(pDiskData->aBlockCol); iBlockCol++) { for (int32_t iBlockCol = 0; iBlockCol < taosArrayGetSize(pDiskData->aBlockCol); iBlockCol++) {
} }
#endif
// ================= SMA ==================== // ================= SMA ====================
if (pSmaInfo) { if (pSmaInfo) {
...@@ -1979,11 +1990,9 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock ...@@ -1979,11 +1990,9 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock
} }
_exit: _exit:
tFree(pBuf);
return code; return code;
_err: _err:
tFree(pBuf);
return code; return code;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册