diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 72296e455169652d643381336faff656bf9aaae1..08331b13279b8780a97fe044622fa33394a57f8a 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -45,7 +45,7 @@ typedef struct SBlockIdx SBlockIdx; typedef struct SBlock SBlock; typedef struct SBlockL SBlockL; typedef struct SColData SColData; -typedef struct SBlockDataHdr SBlockDataHdr; +typedef struct SDiskDataHdr SDiskDataHdr; typedef struct SBlockData SBlockData; typedef struct SDiskData SDiskData; typedef struct SDelFile SDelFile; @@ -155,6 +155,9 @@ int32_t tDiskDataInit(SDiskData *pDiskData); void tDiskDataClear(SDiskData *pDiskData); int32_t tBlockToDiskData(SBlockData *pBlockData, SDiskData *pDiskData, int8_t cmprAlg); int32_t tDiskToBlockData(SDiskData *pDiskData, SBlockData *pBlockData); +// SDiskDataHdr +int32_t tPutDiskDataHdr(uint8_t *p, void *ph); +int32_t tGetDiskDataHdr(uint8_t *p, void *ph); // SDelIdx int32_t tPutDelIdx(uint8_t *p, void *ph); int32_t tGetDelIdx(uint8_t *p, void *ph); @@ -236,10 +239,8 @@ int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter); int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx, uint8_t **ppBuf); int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf, SBlockIdx *pBlockIdx); int32_t tsdbWriteBlockL(SDataFWriter *pWriter, SArray *aBlockL, uint8_t **ppBuf); -int32_t tsdbWriteDataBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock *pBlock, uint8_t **ppBuf1, - uint8_t **ppBuf2, int8_t cmprAlg); -int32_t tsdbWriteLastBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockL *pBlockL, uint8_t **ppBuf1, - uint8_t **ppBuf2, int8_t cmprAlg); +int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo, + int8_t cmprAlg, int8_t toLast, uint8_t **ppBuf); int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo); // SDataFReader @@ -403,9 +404,9 @@ typedef struct { int8_t smaOn; int8_t flag; // HAS_NONE|HAS_NULL|HAS_VALUE int32_t szOrigin; // original column value size (only save for variant data type) - int32_t szBitmap; // bitmap size - int32_t szOffset; // size of offset, only for variant-length data type - int32_t szValue; // compressed column value size + int32_t szBitmap; // bitmap size, 0 only for flag == HAS_VAL + int32_t szOffset; // offset size, 0 only for non-variant-length type + int32_t szValue; // value size, 0 when flag == (HAS_NULL | HAS_NONE) int32_t offset; uint8_t **ppData; } SBlockCol; @@ -414,28 +415,33 @@ typedef struct { int64_t offset; // block data offset int32_t szBlock; int32_t szKey; -} SSubBlock; +} SBlockInfo; + +typedef struct { + int64_t offset; + int32_t size; +} SSmaInfo; struct SBlock { - TSDBKEY minKey; - TSDBKEY maxKey; - int64_t minVer; - int64_t maxVer; - int32_t nRow; - int8_t hasDup; - int8_t nSubBlock; - SSubBlock aSubBlock[TSDB_MAX_SUBBLOCKS]; - int64_t sOffset; // sma offset - int32_t nSma; // sma size + TSDBKEY minKey; + TSDBKEY maxKey; + int64_t minVer; + int64_t maxVer; + int32_t nRow; + int8_t hasDup; + int8_t nSubBlock; + SBlockInfo aSubBlock[TSDB_MAX_SUBBLOCKS]; + SSmaInfo smaInfo; }; struct SBlockL { - int64_t suid; - int64_t minUid; - int64_t maxUid; - int64_t minVer; - int64_t maxVer; - int32_t nRow; + int64_t suid; + int64_t minUid; + int64_t maxUid; + int64_t minVer; + int64_t maxVer; + int32_t nRow; + SBlockInfo bInfo; }; struct SColData { @@ -498,15 +504,15 @@ struct SDelIdx { int64_t size; }; -struct SBlockDataHdr { +struct SDiskDataHdr { uint32_t delimiter; - int32_t nRow; int64_t suid; int64_t uid; int32_t szUid; int32_t szVer; int32_t szKey; int32_t szBlkCol; + int32_t nRow; int8_t cmprAlg; }; @@ -575,6 +581,14 @@ struct SDelFWriter { TdFilePtr pWriteH; }; +struct SDiskData { + SDiskDataHdr hdr; + uint8_t **ppKey; + SArray *aBlockCol; // SArray + int32_t nBuf; + SArray *aBuf; // SArray +}; + struct SDataFWriter { STsdb *pTsdb; SDFileSet wSet; @@ -588,6 +602,8 @@ struct SDataFWriter { SDataFile fData; SLastFile fLast; SSmaFile fSma; + + SDiskData dData; }; struct STsdbReadSnap { @@ -596,24 +612,6 @@ struct STsdbReadSnap { STsdbFS fs; }; -struct SDiskData { - int8_t cmprAlg; - int32_t nRow; - int64_t suid; - int64_t uid; - int32_t szUid; - int32_t szVer; - int32_t szKey; - - uint8_t *pUid; - uint8_t *pVer; - uint8_t *pKey; - SArray *aBlockCol; // SArray - int32_t nBuf; - SArray *aBuf; // SArray - uint8_t *pBuf; -}; - // ========== inline functions ========== static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) { TSDBKEY *pKey1 = (TSDBKEY *)p1; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index e5c6ee44a0e8b6afcbf25d2a1c728087a929d201..cd94762c7b2e32218c3f814124dcd4e75ec87fb2 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -494,10 +494,11 @@ _exit: } static int32_t tsdbCommitDataBlock(SCommitter *pCommitter, SBlock *pBlock) { - int32_t code = 0; - SBlock block; + int32_t code = 0; + SBlockData *pBlockData = &pCommitter->dWriter.bData; + SBlock block; - ASSERT(pCommitter->dWriter.bData.nRow > 0); + ASSERT(pBlockData->nRow > 0); if (pBlock) { block = *pBlock; // as a subblock @@ -505,37 +506,84 @@ static int32_t tsdbCommitDataBlock(SCommitter *pCommitter, SBlock *pBlock) { tBlockReset(&block); // as a new block } - code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bData, &block, NULL, NULL, - pCommitter->cmprAlg); - if (code) goto _exit; + // statistic + block.nRow += pBlockData->nRow; + for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) { + TSDBKEY key = {.ts = pBlockData->aTSKEY[iRow], .version = pBlockData->aVersion[iRow]}; + + if (iRow == 0) { + if (tsdbKeyCmprFn(&block.minKey, &key) > 0) { + block.minKey = key; + } + } else { + if (pBlockData->aTSKEY[iRow] == pBlockData->aTSKEY[iRow - 1]) { + block.hasDup = 1; + } + } + if (iRow == pBlockData->nRow - 1 && tsdbKeyCmprFn(&block.maxKey, &key) < 0) { + block.maxKey = key; + } + + block.minVer = TMIN(block.minVer, key.version); + block.maxVer = TMAX(block.maxVer, key.version); + } + + // write + code = tsdbWriteBlockData(pCommitter->dWriter.pWriter, pBlockData, &block.aSubBlock[block.nSubBlock++], + &block.smaInfo, pCommitter->cmprAlg, 0, NULL); + if (code) goto _err; + + // put SBlock code = tMapDataPutItem(&pCommitter->dWriter.mBlock, &block, tPutBlock); - if (code) goto _exit; + if (code) goto _err; - tBlockDataClearData(&pCommitter->dWriter.bData); + // clear + tBlockDataClearData(pBlockData); -_exit: + return code; + +_err: + tsdbError("vgId:%d tsdb commit data block failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); return code; } static int32_t tsdbCommitLastBlock(SCommitter *pCommitter) { - int32_t code = 0; - SBlockL blockL; - - ASSERT(pCommitter->dWriter.bDatal.nRow > 0); + int32_t code = 0; + SBlockL blockL; + SBlockData *pBlockData = &pCommitter->dWriter.bDatal; + + ASSERT(pBlockData->nRow > 0); + + // statistic + blockL.suid = pBlockData->suid; + blockL.nRow = pBlockData->nRow; + blockL.minVer = VERSION_MAX; + blockL.maxVer = VERSION_MIN; + for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) { + blockL.minVer = TMIN(blockL.minVer, pBlockData->aVersion[iRow]); + blockL.maxVer = TMIN(blockL.maxVer, pBlockData->aVersion[iRow]); + } + blockL.minUid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[0]; + blockL.maxUid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[pBlockData->nRow - 1]; - code = tsdbWriteLastBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, &blockL, NULL, NULL, - pCommitter->cmprAlg); - if (code) goto _exit; + // write + code = tsdbWriteBlockData(pCommitter->dWriter.pWriter, pBlockData, &blockL.bInfo, NULL, pCommitter->cmprAlg, 1, NULL); + if (code) goto _err; + // push SBlockL if (taosArrayPush(pCommitter->dWriter.aBlockL, &blockL) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; + goto _err; } - tBlockDataClearData(&pCommitter->dWriter.bDatal); + // clear + tBlockDataClearData(pBlockData); -_exit: + return code; + +_err: + tsdbError("vgId:%d tsdb commit last block failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index eeb11abc97fef382347d8923c37865d54a5c2a00..c542ab5583db163aeec2d3aae1d8319866a55ab6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -85,11 +85,11 @@ _err: } int32_t tsdbWriteDelData(SDelFWriter *pWriter, SArray *aDelData, uint8_t **ppBuf, SDelIdx *pDelIdx) { - int32_t code = 0; - uint8_t *pBuf = NULL; - int64_t size; - int64_t n; - SBlockDataHdr hdr = {.delimiter = TSDB_FILE_DLMT, .suid = pDelIdx->suid, .uid = pDelIdx->uid}; + int32_t code = 0; + uint8_t *pBuf = NULL; + int64_t size; + int64_t n; + SDiskDataHdr hdr = {.delimiter = TSDB_FILE_DLMT, .suid = pDelIdx->suid, .uid = pDelIdx->uid}; if (!ppBuf) ppBuf = &pBuf; @@ -106,7 +106,7 @@ int32_t tsdbWriteDelData(SDelFWriter *pWriter, SArray *aDelData, uint8_t **ppBuf // build n = 0; - *(SBlockDataHdr *)(*ppBuf) = hdr; + *(SDiskDataHdr *)(*ppBuf) = hdr; n += sizeof(hdr); for (int32_t iDelData = 0; iDelData < taosArrayGetSize(aDelData); iDelData++) { n += tPutDelData(*ppBuf + n, taosArrayGet(aDelData, iDelData)); @@ -306,13 +306,13 @@ _exit: } int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData, uint8_t **ppBuf) { - int32_t code = 0; - int64_t offset = pDelIdx->offset; - int64_t size = pDelIdx->size; - int64_t n; - uint8_t *pBuf = NULL; - SBlockDataHdr *pHdr; - SDelData *pDelData = &(SDelData){0}; + int32_t code = 0; + int64_t offset = pDelIdx->offset; + int64_t size = pDelIdx->size; + int64_t n; + uint8_t *pBuf = NULL; + SDiskDataHdr *pHdr; + SDelData *pDelData = &(SDelData){0}; if (!ppBuf) ppBuf = &pBuf; @@ -344,7 +344,7 @@ int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData // // decode n = 0; - pHdr = (SBlockDataHdr *)(*ppBuf + n); + pHdr = (SDiskDataHdr *)(*ppBuf + n); ASSERT(pHdr->delimiter == TSDB_FILE_DLMT); ASSERT(pHdr->suid == pDelIdx->suid); ASSERT(pHdr->uid == pDelIdx->uid); @@ -669,13 +669,13 @@ _err: } int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBlock, uint8_t **ppBuf) { - int32_t code = 0; - int64_t offset = pBlockIdx->offset; - int64_t size = pBlockIdx->size; - uint8_t *pBuf = NULL; - int64_t n; - int64_t tn; - SBlockDataHdr hdr; + int32_t code = 0; + int64_t offset = pBlockIdx->offset; + int64_t size = pBlockIdx->size; + uint8_t *pBuf = NULL; + int64_t n; + int64_t tn; + SDiskDataHdr hdr; if (!ppBuf) ppBuf = &pBuf; @@ -706,7 +706,7 @@ int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBl } // decode - hdr = *(SBlockDataHdr *)(*ppBuf); + hdr = *(SDiskDataHdr *)(*ppBuf); ASSERT(hdr.delimiter == TSDB_FILE_DLMT); ASSERT(hdr.suid == pBlockIdx->suid); ASSERT(hdr.uid == pBlockIdx->uid); @@ -729,7 +729,7 @@ _err: return code; } -static int32_t tsdbReadBlockDataKey(SBlockData *pBlockData, SSubBlock *pSubBlock, uint8_t *pBuf, uint8_t **ppBuf) { +static int32_t tsdbReadBlockDataKey(SBlockData *pBlockData, SBlockInfo *pSubBlock, uint8_t *pBuf, uint8_t **ppBuf) { int32_t code = 0; int64_t size = pSubBlock->szVersion + pSubBlock->szTSKEY + sizeof(TSCKSUM); int64_t n; @@ -784,7 +784,7 @@ _err: return code; } -static int32_t tsdbReadColDataImpl(SSubBlock *pSubBlock, SBlockCol *pBlockCol, SColData *pColData, uint8_t *pBuf, +static int32_t tsdbReadColDataImpl(SBlockInfo *pSubBlock, SBlockCol *pBlockCol, SColData *pColData, uint8_t *pBuf, uint8_t **ppBuf) { int32_t code = 0; int64_t size; @@ -877,7 +877,7 @@ _err: return code; } -static int32_t tsdbReadBlockCol(uint8_t *pBuf, int32_t szBlockCol, SBlockDataHdr *pHdr, SArray *aBlockCol) { +static int32_t tsdbReadBlockCol(uint8_t *pBuf, int32_t szBlockCol, SDiskDataHdr *pHdr, SArray *aBlockCol) { int32_t code = 0; int32_t n = 0; SBlockCol blockCol; @@ -890,8 +890,8 @@ static int32_t tsdbReadBlockCol(uint8_t *pBuf, int32_t szBlockCol, SBlockDataHdr } // hdr - *pHdr = *(SBlockDataHdr *)pBuf; - n += sizeof(SBlockDataHdr); + *pHdr = *(SDiskDataHdr *)pBuf; + n += sizeof(SDiskDataHdr); // aBlockCol while (n < szBlockCol) { @@ -952,13 +952,13 @@ _exit: static int32_t tsdbReadSubColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int32_t iSubBlock, int16_t *aColId, int32_t nCol, SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2) { - TdFilePtr pFD = pReader->pDataFD; - SSubBlock *pSubBlock = &pBlock->aSubBlock[iSubBlock]; - SArray *aBlockCol = NULL; - int32_t code = 0; - int64_t offset; - int64_t size; - int64_t n; + TdFilePtr pFD = pReader->pDataFD; + SBlockInfo *pSubBlock = &pBlock->aSubBlock[iSubBlock]; + SArray *aBlockCol = NULL; + int32_t code = 0; + int64_t offset; + int64_t size; + int64_t n; tBlockDataReset(pBlockData); pBlockData->nRow = pSubBlock->nRow; @@ -1123,13 +1123,13 @@ _err: static int32_t tsdbReadSubBlockData(SDataFReader *pReader, SBlock *pBlock, int32_t iSubBlock, SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2) { - int32_t code = 0; - uint8_t *p; - int64_t size; - int64_t n; - TdFilePtr pFD = pReader->pDataFD; - SSubBlock *pSubBlock = &pBlock->aSubBlock[iSubBlock]; - SArray *aBlockCol = NULL; + int32_t code = 0; + uint8_t *p; + int64_t size; + int64_t n; + TdFilePtr pFD = pReader->pDataFD; + SBlockInfo *pSubBlock = &pBlock->aSubBlock[iSubBlock]; + SArray *aBlockCol = NULL; tBlockDataReset(pBlockData); @@ -1300,9 +1300,9 @@ int32_t tsdbReadLastBlock(SDataFReader *pReader, SBlockL *pBlockL, SBlockData *p } // decode block col - SBlockDataHdr hdr; - SArray *aBlockCol = taosArrayInit(0, sizeof(SBlockCol)); - uint8_t *p = *ppBuf1; + SDiskDataHdr hdr; + SArray *aBlockCol = taosArrayInit(0, sizeof(SBlockCol)); + uint8_t *p = *ppBuf1; if (aBlockCol == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; @@ -1461,6 +1461,8 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } + code = tDiskDataInit(&pWriter->dData); + if (code) goto _err; pWriter->pTsdb = pTsdb; pWriter->wSet = (SDFileSet){.diskId = pSet->diskId, .fid = pSet->fid, @@ -1639,6 +1641,7 @@ int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync) { goto _err; } + tDiskDataClear(&(*ppWriter)->dData); taosMemoryFree(*ppWriter); _exit: *ppWriter = NULL; @@ -1786,17 +1789,17 @@ _err: } int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *mBlock, uint8_t **ppBuf, SBlockIdx *pBlockIdx) { - int32_t code = 0; - SHeadFile *pHeadFile = &pWriter->fHead; - SBlockDataHdr hdr = {.delimiter = TSDB_FILE_DLMT, .suid = pBlockIdx->suid, .uid = pBlockIdx->uid}; - uint8_t *pBuf = NULL; - int64_t size; - int64_t n; + int32_t code = 0; + SHeadFile *pHeadFile = &pWriter->fHead; + SDiskDataHdr hdr = {.delimiter = TSDB_FILE_DLMT, .suid = pBlockIdx->suid, .uid = pBlockIdx->uid}; + uint8_t *pBuf = NULL; + int64_t size; + int64_t n; ASSERT(mBlock->nItem > 0); // prepare - size = sizeof(SBlockDataHdr) + tPutMapData(NULL, mBlock) + sizeof(TSCKSUM); + size = sizeof(SDiskDataHdr) + tPutMapData(NULL, mBlock) + sizeof(TSCKSUM); // alloc if (!ppBuf) ppBuf = &pBuf; @@ -1805,7 +1808,7 @@ int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *mBlock, uint8_t **ppBuf, // build n = 0; - *(SBlockDataHdr *)(*ppBuf) = hdr; + *(SDiskDataHdr *)(*ppBuf) = hdr; n += sizeof(hdr); n += tPutMapData(*ppBuf + n, mBlock); taosCalcChecksumAppend(0, *ppBuf, size); @@ -1915,141 +1918,7 @@ static void tsdbUpdateBlockInfo(SBlockData *pBlockData, SBlock *pBlock) { pBlock->nRow += pBlockData->nRow; } -static int32_t tsdbWriteDataArray(uint8_t *aData, int32_t nEle, int8_t type, int8_t cmprAlg, int32_t *rSize, - uint8_t **ppBuf1, int64_t nBuf1, uint8_t **ppBuf2) { - int32_t code = 0; - int32_t size; - - if (IS_VAR_DATA_TYPE(type)) { - size = nEle; - } else { - size = tDataTypes[type].bytes * nEle; - } - - if (cmprAlg == NO_COMPRESSION) { - code = tRealloc(ppBuf1, nBuf1 + size); - if (code) goto _exit; - - memcpy(*ppBuf1 + nBuf1, aData, size); - *rSize = size; - } else { - code = tRealloc(ppBuf1, size + COMP_OVERFLOW_BYTES); - if (code) goto _exit; - - if (cmprAlg == TWO_STAGE_COMP) { - code = tRealloc(ppBuf2, size + COMP_OVERFLOW_BYTES); - if (code) goto _exit; - } - - int32_t n = tDataTypes[type].compFunc(aData, tDataTypes[type].bytes * nEle, nEle, *ppBuf1 + nBuf1, - size + COMP_OVERFLOW_BYTES, cmprAlg, *ppBuf2, size + COMP_OVERFLOW_BYTES); - if (n <= 0) { - code = TSDB_CODE_COMPRESS_ERROR; - goto _exit; - } - *rSize = n; - } - -_exit: - return code; -} - -static int32_t tsdbWriteColData(SColData *pColData, SBlockCol *pBlockCol, int8_t cmprAlg, uint8_t **ppBuf1, - int64_t nBuf1, uint8_t **ppBuf2) { - int32_t code = 0; - int64_t size; - int64_t n = 0; - - // BITMAP - if (pColData->flag != HAS_VALUE) { - code = tsdbWriteDataArray(pColData->pBitMap, BIT2_SIZE(pColData->nVal), TSDB_DATA_TYPE_TINYINT, cmprAlg, - &pBlockCol->szBitmap, ppBuf1, nBuf1 + n, ppBuf2); - if (code) goto _err; - } else { - pBlockCol->szBitmap = 0; - } - n += pBlockCol->szBitmap; - - // OFFSET - if (IS_VAR_DATA_TYPE(pColData->type)) { - code = tsdbWriteDataArray((uint8_t *)pColData->aOffset, pColData->nVal, TSDB_DATA_TYPE_INT, cmprAlg, - &pBlockCol->szOffset, ppBuf1, nBuf1 + n, ppBuf2); - if (code) goto _err; - } else { - pBlockCol->szOffset = 0; - } - n += pBlockCol->szOffset; - - // VALUE - if (pColData->flag != (HAS_NULL | HAS_NONE)) { - ASSERT(pColData->nData); - code = tsdbWriteDataArray(pColData->pData, pColData->nData, pColData->type, cmprAlg, &pBlockCol->szValue, ppBuf1, - nBuf1 + n, ppBuf2); - if (code) goto _err; - } else { - ASSERT(pColData->nData == 0); - pBlockCol->szValue = 0; - } - n += pBlockCol->szValue; - - // checksum - n += sizeof(TSCKSUM); - code = tRealloc(ppBuf1, nBuf1 + n); - if (code) goto _err; - taosCalcChecksumAppend(0, *ppBuf1 + nBuf1, n); - - return code; - -_err: - return code; -} - -static int32_t tsdbWriteBlockDataImpl(TdFilePtr pFD, SBlockDataHdr hdr, SArray *aBlockCol, uint8_t *pData, - int64_t nData, uint8_t **ppBuf, int32_t *szBlockCol) { - int32_t code = 0; - int32_t nBlockCol = taosArrayGetSize(aBlockCol); - int64_t size; - int64_t n; - - // HDR + SArray - *szBlockCol = sizeof(hdr); - for (int32_t iBlockCol = 0; iBlockCol < nBlockCol; iBlockCol++) { - (*szBlockCol) += tPutBlockCol(NULL, taosArrayGet(aBlockCol, iBlockCol)); - } - - code = tRealloc(ppBuf, *szBlockCol + sizeof(TSCKSUM)); - if (code) goto _err; - - n = 0; - memcpy(*ppBuf, &hdr, sizeof(hdr)); - n += sizeof(hdr); - for (int32_t iBlockCol = 0; iBlockCol < nBlockCol; iBlockCol++) { - n += tPutBlockCol(*ppBuf + n, taosArrayGet(aBlockCol, iBlockCol)); - } - taosCalcChecksumAppend(0, *ppBuf, *szBlockCol + sizeof(TSCKSUM)); - - ASSERT(n == *szBlockCol); - - n = taosWriteFile(pFD, *ppBuf, *szBlockCol + sizeof(TSCKSUM)); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - // SBlockData - n = taosWriteFile(pFD, pData, nData); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - return code; - -_err: - return code; -} - -static int32_t tsdbWriteBlockSma(TdFilePtr pFD, SBlockData *pBlockData, SSubBlock *pSubBlock, uint8_t **ppBuf) { +static int32_t tsdbWriteBlockSma(TdFilePtr pFD, SBlockData *pBlockData, SBlockInfo *pSubBlock, uint8_t **ppBuf) { int32_t code = 0; int64_t n; SColData *pColData; @@ -2093,230 +1962,93 @@ _err: return code; } -int32_t tsdbWriteDataBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock *pBlock, uint8_t **ppBuf1, - uint8_t **ppBuf2, int8_t cmprAlg) { - int32_t code = 0; - - ASSERT((pBlockData->suid && pBlockData->uid) || (!pBlockData->suid && pBlockData->uid)); - - uint8_t *pBuf1 = NULL; - uint8_t *pBuf2 = NULL; - - if (!ppBuf1) ppBuf1 = &pBuf1; - if (!ppBuf2) ppBuf2 = &pBuf2; - - tsdbUpdateBlockInfo(pBlockData, pBlock); - - SSubBlock *pSubBlock = &pBlock->aSubBlock[pBlock->nSubBlock++]; - - pSubBlock->nRow = pBlockData->nRow; - pSubBlock->cmprAlg = cmprAlg; - pSubBlock->offset = pWriter->fData.size; - - // ======================= BLOCK DATA ======================= - int64_t nBuf1 = 0; - - // VERSION - code = tsdbWriteDataArray((uint8_t *)pBlockData->aVersion, pBlockData->nRow, TSDB_DATA_TYPE_BIGINT, cmprAlg, - &pSubBlock->szVersion, ppBuf1, nBuf1, ppBuf2); - if (code) goto _err; - nBuf1 += pSubBlock->szVersion; +int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo, + int8_t cmprAlg, int8_t toLast, uint8_t **ppBuf) { + int32_t code = 0; + TdFilePtr pFD = toLast ? pWriter->pLastFD : pWriter->pDataFD; + SDiskData *pDiskData = &pWriter->dData; + uint8_t *pBuf = NULL; - // TSKEY - code = tsdbWriteDataArray((uint8_t *)pBlockData->aTSKEY, pBlockData->nRow, TSDB_DATA_TYPE_TIMESTAMP, cmprAlg, - &pSubBlock->szTSKEY, ppBuf1, nBuf1, ppBuf2); - if (code) goto _err; - nBuf1 += pSubBlock->szTSKEY; + if (!ppBuf) ppBuf = &pBuf; - // checksum - nBuf1 += sizeof(TSCKSUM); - code = tRealloc(ppBuf1, nBuf1); + // convert + code = tBlockToDiskData(pBlockData, pDiskData, cmprAlg); if (code) goto _err; - taosCalcChecksumAppend(0, *ppBuf1, nBuf1); - - // COLUMNS - SArray *aBlockCol = taosArrayInit(taosArrayGetSize(pBlockData->aIdx), sizeof(SBlockCol)); - if (aBlockCol == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - int32_t offset = 0; - for (int32_t iCol = 0; iCol < taosArrayGetSize(pBlockData->aIdx); iCol++) { - SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iCol); - SBlockCol blockCol = {0}; - - ASSERT(pColData->flag); - - if (pColData->flag == HAS_NONE) continue; - - blockCol.cid = pColData->cid; - blockCol.type = pColData->type; - blockCol.smaOn = pColData->smaOn; - blockCol.flag = pColData->flag; - blockCol.szOrigin = pColData->nData; - if (pColData->flag != HAS_NULL) { - code = tsdbWriteColData(pColData, &blockCol, cmprAlg, ppBuf1, nBuf1, ppBuf2); - if (code) goto _err; - - blockCol.offset = offset; - offset = offset + blockCol.szBitmap + blockCol.szOffset + blockCol.szValue + sizeof(TSCKSUM); - nBuf1 = nBuf1 + blockCol.szBitmap + blockCol.szOffset + blockCol.szValue + sizeof(TSCKSUM); - } - - if (taosArrayPush(aBlockCol, &blockCol) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } + // write the block + if (toLast) { + pBlkInfo->offset = pWriter->fLast.size; + } else { + pBlkInfo->offset = pWriter->fData.size; } - // write - SBlockDataHdr hdr = {.delimiter = TSDB_FILE_DLMT, .suid = pBlockData->suid, .uid = pBlockData->uid}; - code = tsdbWriteBlockDataImpl(pWriter->pDataFD, hdr, aBlockCol, *ppBuf1, nBuf1, ppBuf2, &pSubBlock->szBlockCol); + // HDR and KEY + int32_t size = tPutDiskDataHdr(NULL, &pDiskData->hdr); + code = tRealloc(ppBuf, size); if (code) goto _err; - pSubBlock->szBlock = pSubBlock->szBlockCol + sizeof(TSCKSUM) + nBuf1; - pWriter->fData.size += pSubBlock->szBlock; + tPutDiskDataHdr(*ppBuf, &pDiskData->hdr); - // ======================= BLOCK SMA ======================= - pSubBlock->sOffset = 0; - pSubBlock->nSma = 0; - - if (pBlock->nSubBlock > 1 || pBlock->hasDup) goto _exit; - - code = tsdbWriteBlockSma(pWriter->pSmaFD, pBlockData, pSubBlock, ppBuf1); - if (code) goto _err; + TSCKSUM cksm = taosCalcChecksum(0, *ppBuf, size); - if (pSubBlock->nSma > 0) { - pSubBlock->sOffset = pWriter->fSma.size; - pWriter->fSma.size += (sizeof(SColumnDataAgg) * pSubBlock->nSma + sizeof(TSCKSUM)); + int64_t n = taosWriteFile(pFD, *ppBuf, size); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; } -_exit: - tFree(pBuf1); - tFree(pBuf2); - taosArrayDestroy(aBlockCol); - return code; - -_err: - tsdbError("vgId:%d, write block data failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); - tFree(pBuf1); - tFree(pBuf2); - taosArrayDestroy(aBlockCol); - return code; -} - -int32_t tsdbWriteLastBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockL *pBlockL, uint8_t **ppBuf1, - uint8_t **ppBuf2, int8_t cmprAlg) { - int32_t code = 0; - - ASSERT((pBlockData->suid && !pBlockData->uid) || (!pBlockData->suid && pBlockData->uid)); - - uint8_t *pBuf1 = NULL; - uint8_t *pBuf2 = NULL; - - if (!ppBuf1) ppBuf1 = &pBuf1; - if (!ppBuf2) ppBuf2 = &pBuf2; + cksm = taosCalcChecksum(cksm, *pDiskData->ppKey, pDiskData->hdr.szUid + pDiskData->hdr.szVer + pDiskData->hdr.szKey); + n = taosWriteFile(pFD, *pDiskData->ppKey, pDiskData->hdr.szUid + pDiskData->hdr.szVer + pDiskData->hdr.szKey); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } - pBlockL->suid = pBlockData->suid; - if (pBlockData->uid) { - pBlockL->maxUid = pBlockL->minUid = pBlockData->uid; - } else { - pBlockL->minUid = pBlockData->aUid[0]; - pBlockL->maxUid = pBlockData->aUid[pBlockData->nRow - 1]; - } - pBlockL->minVer = VERSION_MAX; - pBlockL->maxVer = VERSION_MIN; - pBlockL->nRow = pBlockData->nRow; - pBlockL->offset = pWriter->fLast.size; - pBlockL->cmprAlg = cmprAlg; - for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) { - pBlockL->minVer = TMIN(pBlockL->minVer, pBlockData->aVersion[iRow]); - pBlockL->maxVer = TMAX(pBlockL->maxVer, pBlockData->aVersion[iRow]); + n = taosWriteFile(pFD, &cksm, sizeof(cksm)); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; } - // ======================= BLOCK DATA ======================= - int64_t nBuf1 = 0; + pBlkInfo->szKey = size + pDiskData->hdr.szUid + pDiskData->hdr.szVer + pDiskData->hdr.szKey + sizeof(TSCKSUM); - // UID - if (pBlockData->uid == 0) { - code = tsdbWriteDataArray((uint8_t *)pBlockData->aUid, pBlockData->nRow, TSDB_DATA_TYPE_BIGINT, cmprAlg, - &pBlockL->szUid, ppBuf1, nBuf1, ppBuf2); - if (code) goto _err; - } else { - pBlockL->szUid = 0; + // SBlockCol + if (pDiskData->hdr.szBlkCol == 0) { + pBlkInfo->szBlock = pBlkInfo->szKey; + goto _write_sma; } - nBuf1 += pBlockL->szUid; - // VERSION - code = tsdbWriteDataArray((uint8_t *)pBlockData->aVersion, pBlockData->nRow, TSDB_DATA_TYPE_BIGINT, cmprAlg, - &pBlockL->szVer, ppBuf1, nBuf1, ppBuf2); + code = tRealloc(ppBuf, pDiskData->hdr.szBlkCol + sizeof(TSCKSUM)); if (code) goto _err; - nBuf1 += pBlockL->szVer; - // TSKEY - code = tsdbWriteDataArray((uint8_t *)pBlockData->aTSKEY, pBlockData->nRow, TSDB_DATA_TYPE_TIMESTAMP, cmprAlg, - &pBlockL->szTSKEY, ppBuf1, nBuf1, ppBuf2); - if (code) goto _err; - nBuf1 += pBlockL->szTSKEY; - - // checksum - nBuf1 += sizeof(TSCKSUM); - code = tRealloc(ppBuf1, nBuf1); - if (code) goto _err; - taosCalcChecksumAppend(0, *ppBuf1, nBuf1); + n = 0; + for (int32_t iBlockCol = 0; iBlockCol < taosArrayGetSize(pDiskData->aBlockCol); iBlockCol++) { + n += tPutBlockCol(*ppBuf + n, taosArrayGet(pDiskData->aBlockCol, iBlockCol)); + } + taosCalcChecksumAppend(0, *ppBuf, pDiskData->hdr.szBlkCol + sizeof(TSCKSUM)); - // COLUMNS - SArray *aBlockCol = taosArrayInit(taosArrayGetSize(pBlockData->aIdx), sizeof(SBlockCol)); - if (aBlockCol == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; + n = taosWriteFile(pFD, *ppBuf, pDiskData->hdr.szBlkCol + sizeof(TSCKSUM)); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); goto _err; } - int32_t offset = 0; - for (int32_t iCol = 0; iCol < taosArrayGetSize(pBlockData->aIdx); iCol++) { - SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iCol); - SBlockCol blockCol = {0}; - - ASSERT(pColData->flag); - - if (pColData->flag == HAS_NONE) continue; - - blockCol.cid = pColData->cid; - blockCol.type = pColData->type; - blockCol.smaOn = pColData->smaOn; - blockCol.flag = pColData->flag; - blockCol.szOrigin = pColData->nData; - if (pColData->flag != HAS_NULL) { - code = tsdbWriteColData(pColData, &blockCol, cmprAlg, ppBuf1, nBuf1, ppBuf2); - if (code) goto _err; - - blockCol.offset = offset; - offset = offset + blockCol.szBitmap + blockCol.szOffset + blockCol.szValue + sizeof(TSCKSUM); - nBuf1 = nBuf1 + blockCol.szBitmap + blockCol.szOffset + blockCol.szValue + sizeof(TSCKSUM); - } - - if (taosArrayPush(aBlockCol, &blockCol) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } + for (int32_t iBlockCol = 0; iBlockCol < taosArrayGetSize(pDiskData->aBlockCol); iBlockCol++) { } - // write - SBlockDataHdr hdr = {.delimiter = TSDB_FILE_DLMT, .suid = pBlockData->suid, .uid = pBlockData->uid}; - code = tsdbWriteBlockDataImpl(pWriter->pLastFD, hdr, aBlockCol, *ppBuf1, nBuf1, ppBuf2, &pBlockL->szBlockCol); - if (code) goto _err; +// ================= SMA ==================== +_write_sma: + if (toLast) goto _exit; + if (pSmaInfo == NULL) goto _exit; - pBlockL->szBlock = pBlockL->szBlockCol + sizeof(TSCKSUM) + nBuf1; - pWriter->fLast.size += pBlockL->szBlock; + // TODO - tFree(pBuf1); - tFree(pBuf2); +_exit: + tFree(pBuf); return code; _err: - tFree(pBuf1); - tFree(pBuf2); + tFree(pBuf); return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 13d293f86b6445a6897115bb553e395ef966bb21..72d9175078f37cebcafcf9ef2b9f31c8e28d39a7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -217,31 +217,30 @@ int32_t tCmprBlockIdx(void const *lhs, void const *rhs) { // SBlock ====================================================== void tBlockReset(SBlock *pBlock) { - *pBlock = - (SBlock){.minKey = TSDBKEY_MAX, .maxKey = TSDBKEY_MIN, .minVersion = VERSION_MAX, .maxVersion = VERSION_MIN}; + *pBlock = (SBlock){.minKey = TSDBKEY_MAX, .maxKey = TSDBKEY_MIN, .minVer = VERSION_MAX, .maxVer = VERSION_MIN}; } int32_t tPutBlock(uint8_t *p, void *ph) { int32_t n = 0; SBlock *pBlock = (SBlock *)ph; - n += tPutTSDBKEY(p ? p + n : p, &pBlock->minKey); - n += tPutTSDBKEY(p ? p + n : p, &pBlock->maxKey); + n += tPutI64v(p ? p + n : p, pBlock->minKey.version); + n += tPutI64(p ? p + n : p, pBlock->minKey.ts); + n += tPutI64v(p ? p + n : p, pBlock->maxKey.version); + n += tPutI64(p ? p + n : p, pBlock->maxKey.ts); n += tPutI64v(p ? p + n : p, pBlock->minVer); n += tPutI64v(p ? p + n : p, pBlock->maxVer); n += tPutI32v(p ? p + n : p, pBlock->nRow); n += tPutI8(p ? p + n : p, pBlock->hasDup); n += tPutI8(p ? p + n : p, pBlock->nSubBlock); for (int8_t iSubBlock = 0; iSubBlock < pBlock->nSubBlock; iSubBlock++) { - n += tPutI32v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].nRow); - n += tPutI8(p ? p + n : p, pBlock->aSubBlock[iSubBlock].cmprAlg); n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].offset); - n += tPutI32v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].szBlockCol); - n += tPutI32v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].szVersion); - n += tPutI32v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].szTSKEY); n += tPutI32v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].szBlock); - n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].sOffset); - n += tPutI32v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].nSma); + n += tPutI32v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].szKey); + } + if (pBlock->nSubBlock == 1 && !pBlock->hasDup) { + n += tPutI64v(p ? p + n : p, pBlock->smaInfo.offset); + n += tPutI32v(p ? p + n : p, pBlock->smaInfo.size); } return n; @@ -251,23 +250,26 @@ int32_t tGetBlock(uint8_t *p, void *ph) { int32_t n = 0; SBlock *pBlock = (SBlock *)ph; - n += tGetTSDBKEY(p + n, &pBlock->minKey); - n += tGetTSDBKEY(p + n, &pBlock->maxKey); + n += tGetI64v(p + n, &pBlock->minKey.version); + n += tGetI64(p + n, &pBlock->minKey.ts); + n += tGetI64v(p + n, &pBlock->maxKey.version); + n += tGetI64(p + n, &pBlock->maxKey.ts); n += tGetI64v(p + n, &pBlock->minVer); n += tGetI64v(p + n, &pBlock->maxVer); n += tGetI32v(p + n, &pBlock->nRow); n += tGetI8(p + n, &pBlock->hasDup); n += tGetI8(p + n, &pBlock->nSubBlock); for (int8_t iSubBlock = 0; iSubBlock < pBlock->nSubBlock; iSubBlock++) { - n += tGetI32v(p + n, &pBlock->aSubBlock[iSubBlock].nRow); - n += tGetI8(p + n, &pBlock->aSubBlock[iSubBlock].cmprAlg); n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].offset); - n += tGetI32v(p + n, &pBlock->aSubBlock[iSubBlock].szBlockCol); - n += tGetI32v(p + n, &pBlock->aSubBlock[iSubBlock].szVersion); - n += tGetI32v(p + n, &pBlock->aSubBlock[iSubBlock].szTSKEY); n += tGetI32v(p + n, &pBlock->aSubBlock[iSubBlock].szBlock); - n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].sOffset); - n += tGetI32v(p + n, &pBlock->aSubBlock[iSubBlock].nSma); + n += tGetI32v(p + n, &pBlock->aSubBlock[iSubBlock].szKey); + } + if (pBlock->nSubBlock == 1 && !pBlock->hasDup) { + n += tGetI64v(p + n, &pBlock->smaInfo.offset); + n += tGetI32v(p + n, &pBlock->smaInfo.size); + } else { + pBlock->smaInfo.offset = 0; + pBlock->smaInfo.size = 0; } return n; @@ -290,8 +292,9 @@ bool tBlockHasSma(SBlock *pBlock) { if (pBlock->nSubBlock > 1) return false; if (pBlock->hasDup) return false; - return pBlock->aSubBlock[0].nSma > 0; + return pBlock->smaInfo.size > 0; } + // SBlockL ====================================================== int32_t tPutBlockL(uint8_t *p, void *ph) { int32_t n = 0; @@ -303,13 +306,9 @@ int32_t tPutBlockL(uint8_t *p, void *ph) { n += tPutI64v(p ? p + n : p, pBlockL->minVer); n += tPutI64v(p ? p + n : p, pBlockL->maxVer); n += tPutI32v(p ? p + n : p, pBlockL->nRow); - n += tPutI64v(p ? p + n : p, pBlockL->offset); - n += tPutI8(p ? p + n : p, pBlockL->cmprAlg); - n += tPutI32v(p ? p + n : p, pBlockL->szBlockCol); - n += tPutI32v(p ? p + n : p, pBlockL->szUid); - n += tPutI32v(p ? p + n : p, pBlockL->szVer); - n += tPutI32v(p ? p + n : p, pBlockL->szTSKEY); - n += tPutI32v(p ? p + n : p, pBlockL->szBlock); + n += tPutI64v(p ? p + n : p, pBlockL->bInfo.offset); + n += tPutI32v(p ? p + n : p, pBlockL->bInfo.szBlock); + n += tPutI32v(p ? p + n : p, pBlockL->bInfo.szKey); return n; } @@ -324,13 +323,9 @@ int32_t tGetBlockL(uint8_t *p, void *ph) { n += tGetI64v(p + n, &pBlockL->minVer); n += tGetI64v(p + n, &pBlockL->maxVer); n += tGetI32v(p + n, &pBlockL->nRow); - n += tGetI64v(p + n, &pBlockL->offset); - n += tGetI8(p + n, &pBlockL->cmprAlg); - n += tGetI32v(p + n, &pBlockL->szBlockCol); - n += tGetI32v(p + n, &pBlockL->szUid); - n += tGetI32v(p + n, &pBlockL->szVer); - n += tGetI32v(p + n, &pBlockL->szTSKEY); - n += tGetI32v(p + n, &pBlockL->szBlock); + n += tGetI64v(p + n, &pBlockL->bInfo.offset); + n += tGetI32v(p + n, &pBlockL->bInfo.szBlock); + n += tGetI32v(p + n, &pBlockL->bInfo.szKey); return n; } @@ -346,15 +341,25 @@ int32_t tPutBlockCol(uint8_t *p, void *ph) { n += tPutI8(p ? p + n : p, pBlockCol->type); n += tPutI8(p ? p + n : p, pBlockCol->smaOn); n += tPutI8(p ? p + n : p, pBlockCol->flag); + n += tPutI32v(p ? p + n : p, pBlockCol->szOrigin); if (pBlockCol->flag != HAS_NULL) { - n += tPutI32v(p ? p + n : p, pBlockCol->szOrigin); - n += tPutI32v(p ? p + n : p, pBlockCol->szBitmap); - n += tPutI32v(p ? p + n : p, pBlockCol->szOffset); - n += tPutI32v(p ? p + n : p, pBlockCol->szValue); + if (pBlockCol->flag != HAS_VALUE) { + n += tPutI32v(p ? p + n : p, pBlockCol->szBitmap); + } + + if (IS_VAR_DATA_TYPE(pBlockCol->type)) { + n += tPutI32v(p ? p + n : p, pBlockCol->szOffset); + } + + if (pBlockCol->flag != (HAS_NULL | HAS_NONE)) { + n += tPutI32v(p ? p + n : p, pBlockCol->szValue); + } + n += tPutI32v(p ? p + n : p, pBlockCol->offset); } +_exit: return n; } @@ -366,14 +371,28 @@ int32_t tGetBlockCol(uint8_t *p, void *ph) { n += tGetI8(p + n, &pBlockCol->type); n += tGetI8(p + n, &pBlockCol->smaOn); n += tGetI8(p + n, &pBlockCol->flag); + n += tGetI32v(p + n, &pBlockCol->szOrigin); ASSERT(pBlockCol->flag && (pBlockCol->flag != HAS_NONE)); + pBlockCol->szBitmap = 0; + pBlockCol->szOffset = 0; + pBlockCol->szValue = 0; + pBlockCol->offset = 0; + if (pBlockCol->flag != HAS_NULL) { - n += tGetI32v(p + n, &pBlockCol->szOrigin); - n += tGetI32v(p + n, &pBlockCol->szBitmap); - n += tGetI32v(p + n, &pBlockCol->szOffset); - n += tGetI32v(p + n, &pBlockCol->szValue); + if (pBlockCol->flag != HAS_VALUE) { + n += tGetI32v(p + n, &pBlockCol->szBitmap); + } + + if (IS_VAR_DATA_TYPE(pBlockCol->type)) { + n += tGetI32v(p + n, &pBlockCol->szOffset); + } + + if (pBlockCol->flag != (HAS_NULL | HAS_NONE)) { + n += tGetI32v(p + n, &pBlockCol->szValue); + } + n += tGetI32v(p + n, &pBlockCol->offset); } @@ -1650,22 +1669,35 @@ int32_t tBlockToDiskData(SBlockData *pBlockData, SDiskData *pDiskData, int8_t cm taosArrayClear(pDiskData->aBlockCol); pDiskData->nBuf = 0; - // uid - if (pDiskData->uid == 0) { - code = tsdbCmprData((uint8_t *)pBlockData->aUid, sizeof(int64_t) * pBlockData->nRow, TSDB_DATA_TYPE_BIGINT, cmprAlg, - &pDiskData->pUid, &pDiskData->szUid, &pDiskData->pBuf); - if (code) goto _exit; - } + { + pDiskData->ppKey = tDiskDataAllocBuf(pDiskData); + if (pDiskData->ppKey == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } - // version - code = tsdbCmprData((uint8_t *)pBlockData->aVersion, sizeof(int64_t) * pBlockData->nRow, TSDB_DATA_TYPE_BIGINT, - cmprAlg, &pDiskData->pVer, &pDiskData->szVer, &pDiskData->pBuf); - if (code) goto _exit; + int32_t n = 0; + // uid + if (pDiskData->uid == 0) { + code = tsdbCmprData((uint8_t *)pBlockData->aUid, sizeof(int64_t) * pBlockData->nRow, TSDB_DATA_TYPE_BIGINT, + cmprAlg, pDiskData->ppKey, n, &pDiskData->szUid, NULL); + if (code) goto _exit; + } else { + pDiskData->szUid = 0; + } + n += pDiskData->szUid; - // tskey - code = tsdbCmprData((uint8_t *)pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->nRow, TSDB_DATA_TYPE_TIMESTAMP, - cmprAlg, &pDiskData->pKey, &pDiskData->szKey, &pDiskData->pBuf); - if (code) goto _exit; + // version + code = tsdbCmprData((uint8_t *)pBlockData->aVersion, sizeof(int64_t) * pBlockData->nRow, TSDB_DATA_TYPE_BIGINT, + cmprAlg, pDiskData->ppKey, n, &pDiskData->szVer, NULL); + if (code) goto _exit; + n += pDiskData->szVer; + + // tskey + code = tsdbCmprData((uint8_t *)pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->nRow, TSDB_DATA_TYPE_TIMESTAMP, + cmprAlg, pDiskData->ppKey, &pDiskData->szKey, NULL); + if (code) goto _exit; + } // columns int32_t offset = 0; @@ -1689,7 +1721,7 @@ int32_t tBlockToDiskData(SBlockData *pBlockData, SDiskData *pDiskData, int8_t cm } // compress - code = tsdbCmprColData(pColData, cmprAlg, &blockCol); + code = tsdbCmprColData(pColData, cmprAlg, &blockCol, NULL); if (code) goto _exit; // update offset @@ -1709,6 +1741,41 @@ _exit: int32_t tBlockToDiskData(SBlockData *pBlockData, SDiskData *pDiskData, int8_t cmprAlg); int32_t tDiskToBlockData(SDiskData *pDiskData, SBlockData *pBlockData); +// SDiskDataHdr ============================== +int32_t tPutDiskDataHdr(uint8_t *p, void *ph) { + int32_t n = 0; + SDiskDataHdr *pHdr = (SDiskDataHdr *)ph; + + n += tPutU32(p ? p + n : p, pHdr->delimiter); + n += tPutI64(p ? p + n : p, pHdr->suid); + n += tPutI64(p ? p + n : p, pHdr->uid); + n += tPutI32v(p ? p + n : p, pHdr->szUid); + n += tPutI32v(p ? p + n : p, pHdr->szVer); + n += tPutI32v(p ? p + n : p, pHdr->szKey); + n += tPutI32v(p ? p + n : p, pHdr->szBlkCol); + n += tPutI32v(p ? p + n : p, pHdr->nRow); + n += tPutI8(p ? p + n : p, pHdr->cmprAlg); + + return n; +} + +int32_t tGetDiskDataHdr(uint8_t *p, void *ph) { + int32_t n = 0; + SDiskDataHdr *pHdr = (SDiskDataHdr *)ph; + + n += tGetU32(p + n, &pHdr->delimiter); + n += tGetI64(p + n, &pHdr->suid); + n += tGetI64(p + n, &pHdr->uid); + n += tGetI32v(p + n, &pHdr->szUid); + n += tGetI32v(p + n, &pHdr->szVer); + n += tGetI32v(p + n, &pHdr->szKey); + n += tGetI32v(p + n, &pHdr->szBlkCol); + n += tGetI32v(p + n, &pHdr->nRow); + n += tGetI8(p + n, &pHdr->cmprAlg); + + return n; +} + // ALGORITHM ============================== void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) { SColVal colVal;