From a4d16f1c0029c245446fba6009b14baca9af1c0f Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 27 Sep 2022 17:36:02 +0800 Subject: [PATCH] more code --- include/util/tcompression.h | 2 +- source/dnode/vnode/src/inc/tsdb.h | 6 +- source/dnode/vnode/src/tsdb/tsdbCommit.c | 7 +- source/dnode/vnode/src/tsdb/tsdbDiskData.c | 97 +++++++++++++--------- source/util/src/tcompression.c | 15 +++- 5 files changed, 78 insertions(+), 49 deletions(-) diff --git a/include/util/tcompression.h b/include/util/tcompression.h index 8f3d9634ec..84c2922c9e 100644 --- a/include/util/tcompression.h +++ b/include/util/tcompression.h @@ -131,7 +131,7 @@ typedef struct SCompressor SCompressor; int32_t tCompressorCreate(SCompressor **ppCmprsor); int32_t tCompressorDestroy(SCompressor *pCmprsor); int32_t tCompressStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg); -int32_t tCompressEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData); +int32_t tCompressEnd(SCompressor *pCmprsor, const uint8_t **ppOut, int32_t *nOut, int32_t *nOrigin); int32_t tCompress(SCompressor *pCmprsor, const void *pData, int64_t nData); #ifdef __cplusplus diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 264746e8f2..ca16b87e17 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -326,9 +326,9 @@ int32_t tDiskDataBuilderCreate(SDiskDataBuilder **ppBuilder); void *tDiskDataBuilderDestroy(SDiskDataBuilder *pBuilder); int32_t tDiskDataBuilderInit(SDiskDataBuilder *pBuilder, STSchema *pTSchema, TABLEID *pId, uint8_t cmprAlg, uint8_t calcSma); -int32_t tDiskDataBuilderAddRow(SDiskDataBuilder *pBuilder, TSDBROW *pRow, STSchema *pTSchema, TABLEID *pId); -int32_t tGnrtDiskData(SDiskDataBuilder *pBuilder, SDiskData *pDiskData); -int32_t tDiskDataDestroy(SDiskData *pDiskData); +int32_t tDiskDataBuilderClear(SDiskDataBuilder *pBuilder); +int32_t tDiskDataAddRow(SDiskDataBuilder *pBuilder, TSDBROW *pRow, STSchema *pTSchema, TABLEID *pId); +int32_t tGnrtDiskData(SDiskDataBuilder *pBuilder, const SDiskData **ppDiskData); // structs ======================= struct STsdbFS { diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 09b790ee81..7bc335d034 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -665,7 +665,7 @@ static int32_t tsdbCommitSttBlk(SDataFWriter *pWriter, SDiskDataBuilder *pBuilde .nRow = pBuilder->nRow}; // gnrt - code = tGnrtDiskData(pBuilder, &pBuilder->dd); + // code = tGnrtDiskData(pBuilder, &pBuilder->dd); TSDB_CHECK_CODE(code, lino, _exit); // write @@ -1369,7 +1369,6 @@ static int32_t tsdbInitSttBlockBuilderIfNeed(SCommitter *pCommitter, TABLEID id) if (!pBuilder->suid && !pBuilder->uid) { ASSERT(pCommitter->skmTable.suid == id.suid); ASSERT(pCommitter->skmTable.uid == id.uid); - TABLEID tid = {.suid = id.suid, .uid = id.suid ? 0 : id.uid}; code = tDiskDataBuilderInit(pCommitter->dWriter.pBuilder, pCommitter->skmTable.pTSchema, &id, pCommitter->cmprAlg, 0); TSDB_CHECK_CODE(code, lino, _exit); @@ -1396,7 +1395,7 @@ static int32_t tsdbAppendLastBlock(SCommitter *pCommitter) { for (int32_t iRow = 0; iRow < pBData->nRow; iRow++) { TSDBROW row = tsdbRowFromBlockData(pBData, iRow); - code = tDiskDataBuilderAddRow(pCommitter->dWriter.pBuilder, &row, NULL, &id); + code = tDiskDataAddRow(pCommitter->dWriter.pBuilder, &row, NULL, &id); TSDB_CHECK_CODE(code, lino, _exit); if (pCommitter->dWriter.pBuilder->nRow >= pCommitter->maxRow) { @@ -1435,7 +1434,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) { pTSchema = pCommitter->skmRow.pTSchema; } - code = tDiskDataBuilderAddRow(pCommitter->dWriter.pBuilder, &pRowInfo->row, pTSchema, &id); + code = tDiskDataAddRow(pCommitter->dWriter.pBuilder, &pRowInfo->row, pTSchema, &id); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbNextCommitRow(pCommitter); diff --git a/source/dnode/vnode/src/tsdb/tsdbDiskData.c b/source/dnode/vnode/src/tsdb/tsdbDiskData.c index bbf4e3337c..8cba73d353 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDiskData.c +++ b/source/dnode/vnode/src/tsdb/tsdbDiskData.c @@ -31,9 +31,16 @@ struct SDiskColBuilder { SColumnDataAgg sma; uint8_t minSet; uint8_t maxSet; - uint8_t *aBuf[1]; + uint8_t *aBuf[2]; }; +// SDiskData ================================================ +static int32_t tDiskDataDestroy(SDiskData *pDiskData) { + int32_t code = 0; + pDiskData->aDiskCol = taosArrayDestroy(pDiskData->aDiskCol); + return code; +} + // SDiskColBuilder ================================================ #define tDiskColBuilderCreate() \ (SDiskColBuilder) { 0 } @@ -91,7 +98,7 @@ static int32_t tGnrtDiskCol(SDiskColBuilder *pBuilder, SDiskCol *pDiskCol) { .type = pBuilder->type, .smaOn = pBuilder->calcSma, .flag = pBuilder->flag, - .szOrigin = 0, // todo + .szOrigin = 0, .szBitmap = 0, .szOffset = 0, .szValue = 0, @@ -109,20 +116,27 @@ static int32_t tGnrtDiskCol(SDiskColBuilder *pBuilder, SDiskCol *pDiskCol) { nBit = BIT1_SIZE(pBuilder->nVal); } - pDiskCol->bCol.szBitmap = tsCompressTinyint(pBuilder->pBitMap, nBit, nBit, pBuilder->aBuf[0], 0, pBuilder->cmprAlg, - NULL, 0); // todo: alloc + code = tRealloc(&pBuilder->aBuf[0], nBit + COMP_OVERFLOW_BYTES); + if (code) return code; + + code = tRealloc(&pBuilder->aBuf[1], nBit + COMP_OVERFLOW_BYTES); + if (code) return code; + + pDiskCol->bCol.szBitmap = + tsCompressTinyint(pBuilder->pBitMap, nBit, nBit, pBuilder->aBuf[0], nBit + COMP_OVERFLOW_BYTES, + pBuilder->cmprAlg, pBuilder->aBuf[1], nBit + COMP_OVERFLOW_BYTES); pDiskCol->pBit = pBuilder->aBuf[0]; } // OFFSET if (IS_VAR_DATA_TYPE(pBuilder->type)) { - code = tCompressEnd(pBuilder->pOffC, &pDiskCol->pOff, &pDiskCol->bCol.szOffset); + code = tCompressEnd(pBuilder->pOffC, &pDiskCol->pOff, &pDiskCol->bCol.szOffset, NULL); if (code) return code; } // VALUE if (pBuilder->flag != (HAS_NULL | HAS_NONE)) { - code = tCompressEnd(pBuilder->pValC, &pDiskCol->pVal, &pDiskCol->bCol.szValue); + code = tCompressEnd(pBuilder->pValC, &pDiskCol->pVal, &pDiskCol->bCol.szValue, &pDiskCol->bCol.szOrigin); if (code) return code; } @@ -425,8 +439,8 @@ static int32_t tDiskColAddVal(SDiskColBuilder *pBuilder, SColVal *pColVal) { } } - if (tDiskColAddValImpl[pBuilder->type][pColVal->type]) { - code = tDiskColAddValImpl[pBuilder->type][pColVal->type](pBuilder, pColVal); + if (tDiskColAddValImpl[pBuilder->flag][pColVal->flag]) { + code = tDiskColAddValImpl[pBuilder->flag][pColVal->flag](pBuilder, pColVal); if (code) return code; } @@ -465,6 +479,7 @@ void *tDiskDataBuilderDestroy(SDiskDataBuilder *pBuilder) { for (int32_t iBuf = 0; iBuf < sizeof(pBuilder->aBuf) / sizeof(pBuilder->aBuf[0]); iBuf++) { tFree(pBuilder->aBuf[iBuf]); } + tDiskDataDestroy(&pBuilder->dd); taosMemoryFree(pBuilder); return NULL; @@ -474,6 +489,8 @@ int32_t tDiskDataBuilderInit(SDiskDataBuilder *pBuilder, STSchema *pTSchema, TAB uint8_t calcSma) { int32_t code = 0; + ASSERT(pId->suid || pId->uid); + pBuilder->suid = pId->suid; pBuilder->uid = pId->uid; pBuilder->nRow = 0; @@ -512,9 +529,9 @@ int32_t tDiskDataBuilderInit(SDiskDataBuilder *pBuilder, STSchema *pTSchema, TAB } } - SDiskColBuilder *pDiskColBuilder = (SDiskColBuilder *)taosArrayGet(pBuilder->aBuilder, pBuilder->nBuilder); + SDiskColBuilder *pDCBuilder = (SDiskColBuilder *)taosArrayGet(pBuilder->aBuilder, pBuilder->nBuilder); - code = tDiskColBuilderInit(pDiskColBuilder, pTColumn->colId, pTColumn->type, cmprAlg, + code = tDiskColBuilderInit(pDCBuilder, pTColumn->colId, pTColumn->type, cmprAlg, (calcSma && (pTColumn->flags & COL_SMA_ON))); if (code) return code; @@ -524,14 +541,22 @@ int32_t tDiskDataBuilderInit(SDiskDataBuilder *pBuilder, STSchema *pTSchema, TAB return code; } -int32_t tDiskDataBuilderAddRow(SDiskDataBuilder *pBuilder, TSDBROW *pRow, STSchema *pTSchema, TABLEID *pId) { +int32_t tDiskDataBuilderClear(SDiskDataBuilder *pBuilder) { + int32_t code = 0; + pBuilder->suid = 0; + pBuilder->uid = 0; + return code; +} + +int32_t tDiskDataAddRow(SDiskDataBuilder *pBuilder, TSDBROW *pRow, STSchema *pTSchema, TABLEID *pId) { int32_t code = 0; + ASSERT(pBuilder->suid || pBuilder->uid); ASSERT(pId->suid == pBuilder->suid); // uid if (pBuilder->uid && pBuilder->uid != pId->uid) { - ASSERT(!pBuilder->calcSma); + ASSERT(pBuilder->suid); for (int32_t iRow = 0; iRow < pBuilder->nRow; iRow++) { code = tCompress(pBuilder->pUidC, &pBuilder->uid, sizeof(int64_t)); if (code) return code; @@ -564,14 +589,13 @@ int32_t tDiskDataBuilderAddRow(SDiskDataBuilder *pBuilder, TSDBROW *pRow, STSche pColVal = tRowIterNext(&iter); } - if (pColVal == NULL || pColVal->cid > pDCBuilder->cid) { - SColVal cv = COL_VAL_NONE(pDCBuilder->cid, pDCBuilder->type); - code = tDiskColAddVal(pDCBuilder, &cv); - if (code) return code; - } else { + if (pColVal && pColVal->cid == pDCBuilder->cid) { code = tDiskColAddVal(pDCBuilder, pColVal); if (code) return code; pColVal = tRowIterNext(&iter); + } else { + code = tDiskColAddVal(pDCBuilder, &COL_VAL_NONE(pDCBuilder->cid, pDCBuilder->type)); + if (code) return code; } } pBuilder->nRow++; @@ -579,11 +603,14 @@ int32_t tDiskDataBuilderAddRow(SDiskDataBuilder *pBuilder, TSDBROW *pRow, STSche return code; } -int32_t tGnrtDiskData(SDiskDataBuilder *pBuilder, SDiskData *pDiskData) { +int32_t tGnrtDiskData(SDiskDataBuilder *pBuilder, const SDiskData **ppDiskData) { int32_t code = 0; ASSERT(pBuilder->nRow); + *ppDiskData = NULL; + + SDiskData *pDiskData = &pBuilder->dd; // reset SDiskData pDiskData->hdr = (SDiskDataHdr){.delimiter = TSDB_FILE_DLMT, .fmtVer = 0, @@ -598,30 +625,32 @@ int32_t tGnrtDiskData(SDiskDataBuilder *pBuilder, SDiskData *pDiskData) { pDiskData->pUid = NULL; pDiskData->pVer = NULL; pDiskData->pKey = NULL; - if (pDiskData->aDiskCol) { - taosArrayClear(pDiskData->aDiskCol); - } else { - pDiskData->aDiskCol = taosArrayInit(pBuilder->nBuilder, sizeof(SDiskCol)); - if (pDiskData->aDiskCol == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - return code; - } - } // UID if (pBuilder->uid == 0) { - code = tCompressEnd(pBuilder->pUidC, &pDiskData->pUid, &pDiskData->hdr.szUid); + code = tCompressEnd(pBuilder->pUidC, &pDiskData->pUid, &pDiskData->hdr.szUid, NULL); if (code) return code; } // VERSION - code = tCompressEnd(pBuilder->pVerC, &pDiskData->pVer, &pDiskData->hdr.szVer); + code = tCompressEnd(pBuilder->pVerC, &pDiskData->pVer, &pDiskData->hdr.szVer, NULL); if (code) return code; // TSKEY - code = tCompressEnd(pBuilder->pKeyC, &pDiskData->pKey, &pDiskData->hdr.szKey); + code = tCompressEnd(pBuilder->pKeyC, &pDiskData->pKey, &pDiskData->hdr.szKey, NULL); if (code) return code; + // aDiskCol + if (pDiskData->aDiskCol) { + taosArrayClear(pDiskData->aDiskCol); + } else { + pDiskData->aDiskCol = taosArrayInit(pBuilder->nBuilder, sizeof(SDiskCol)); + if (pDiskData->aDiskCol == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + return code; + } + } + int32_t offset = 0; for (int32_t iBuilder = 0; iBuilder < pBuilder->nBuilder; iBuilder++) { SDiskColBuilder *pDCBuilder = (SDiskColBuilder *)taosArrayGet(pBuilder->aBuilder, iBuilder); @@ -644,12 +673,6 @@ int32_t tGnrtDiskData(SDiskDataBuilder *pBuilder, SDiskData *pDiskData) { pDiskData->hdr.szBlkCol += tPutBlockCol(NULL, &dCol.bCol); } - return code; -} - -// SDiskData ================================================ -int32_t tDiskDataDestroy(SDiskData *pDiskData) { - int32_t code = 0; - pDiskData->aDiskCol = taosArrayDestroy(pDiskData->aDiskCol); + *ppDiskData = pDiskData; return code; } diff --git a/source/util/src/tcompression.c b/source/util/src/tcompression.c index 133d1b29b7..8c38b4f20d 100644 --- a/source/util/src/tcompression.c +++ b/source/util/src/tcompression.c @@ -2000,16 +2000,23 @@ int32_t tCompressStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) { return code; } -int32_t tCompressEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData) { +int32_t tCompressEnd(SCompressor *pCmprsor, const uint8_t **ppOut, int32_t *nOut, int32_t *nOrigin) { int32_t code = 0; - *ppData = NULL; - *nData = 0; + *ppOut = NULL; + *nOut = 0; + if (nOrigin) { + if (DATA_TYPE_INFO[pCmprsor->type].isVarLen) { + *nOrigin = pCmprsor->nBuf - 1; + } else { + *nOrigin = pCmprsor->nVal * DATA_TYPE_INFO[pCmprsor->type].bytes; + } + } if (pCmprsor->nVal == 0) return code; if (DATA_TYPE_INFO[pCmprsor->type].endFn) { - return DATA_TYPE_INFO[pCmprsor->type].endFn(pCmprsor, ppData, nData); + return DATA_TYPE_INFO[pCmprsor->type].endFn(pCmprsor, ppOut, nOut); } return code; -- GitLab