diff --git a/include/common/tcommon.h b/include/common/tcommon.h index f668fcf4a990931b5b31c35af73f6f9eaf4a6579..373df156115ba0c375c520bec77fb42ba16d0946 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -61,15 +61,17 @@ typedef struct { uint64_t suid; } STableListInfo; +#pragma pack(push, 1) typedef struct SColumnDataAgg { int16_t colId; - int16_t maxIndex; int16_t minIndex; + int16_t maxIndex; int16_t numOfNull; int64_t sum; int64_t max; int64_t min; } SColumnDataAgg; +#pragma pack(pop) typedef struct SDataBlockInfo { STimeWindow window; @@ -114,7 +116,7 @@ typedef struct SQueryTableDataCond { int32_t order; // desc|asc order to iterate the data block int32_t numOfCols; SColumnInfo* colList; - int32_t type; // data block load type: + int32_t type; // data block load type: int32_t numOfTWindows; STimeWindow* twindows; int64_t startVersion; diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index a53f8422d47ceabe01dd11417c0dea11f0f322e3..1c1ded6a44e3e5961081bb5af3493e16cc914778 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -119,10 +119,7 @@ int32_t tPutBlockCol(uint8_t *p, void *ph); int32_t tGetBlockCol(uint8_t *p, void *ph); int32_t tBlockColCmprFn(const void *p1, const void *p2); // SBlock -#define tBlockInit() ((SBlock){0}) void tBlockReset(SBlock *pBlock); -void tBlockClear(SBlock *pBlock); -int32_t tBlockCopy(SBlock *pBlockSrc, SBlock *pBlockDest); int32_t tPutBlock(uint8_t *p, void *ph); int32_t tGetBlock(uint8_t *p, void *ph); int32_t tBlockCmprFn(const void *p1, const void *p2); @@ -134,11 +131,11 @@ int32_t tGetBlockIdx(uint8_t *p, void *ph); int32_t tCmprBlockIdx(void const *lhs, void const *rhs); // SColdata #define tColDataInit() ((SColData){0}) -void tColDataReset(SColData *pColData, int16_t cid, int8_t type); +void tColDataReset(SColData *pColData, int16_t cid, int8_t type, int8_t smaOn); void tColDataClear(void *ph); int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal); -int32_t tColDataCopy(SColData *pColDataSrc, SColData *pColDataDest); int32_t tColDataGetValue(SColData *pColData, int32_t iRow, SColVal *pColVal); +int32_t tColDataCopy(SColData *pColDataSrc, SColData *pColDataDest); // SBlockData #define tBlockDataFirstRow(PBLOCKDATA) tsdbRowFromBlockData(PBLOCKDATA, 0) #define tBlockDataLastRow(PBLOCKDATA) tsdbRowFromBlockData(PBLOCKDATA, (PBLOCKDATA)->nRow - 1) @@ -166,9 +163,8 @@ void tsdbFree(uint8_t *pBuf); #define tMapDataInit() ((SMapData){0}) void tMapDataReset(SMapData *pMapData); void tMapDataClear(SMapData *pMapData); -int32_t tMapDataCopy(SMapData *pMapDataSrc, SMapData *pMapDataDest); int32_t tMapDataPutItem(SMapData *pMapData, void *pItem, int32_t (*tPutItemFn)(uint8_t *, void *)); -int32_t tMapDataGetItemByIdx(SMapData *pMapData, int32_t idx, void *pItem, int32_t (*tGetItemFn)(uint8_t *, void *)); +void tMapDataGetItemByIdx(SMapData *pMapData, int32_t idx, void *pItem, int32_t (*tGetItemFn)(uint8_t *, void *)); int32_t tMapDataSearch(SMapData *pMapData, void *pSearchItem, int32_t (*tGetItemFn)(uint8_t *, void *), int32_t (*tItemCmprFn)(const void *, const void *), void *pItem); int32_t tPutMapData(uint8_t *p, SMapData *pMapData); @@ -223,7 +219,6 @@ int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx, uint8_t **pp int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf, SBlockIdx *pBlockIdx); int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2, SBlockIdx *pBlockIdx, SBlock *pBlock, int8_t cmprAlg); -int32_t tsdbWriteBlockSMA(SDataFWriter *pWriter, SBlockSMA *pBlockSMA, int64_t *rOffset, int64_t *rSize); SDFileSet *tsdbDataFWriterGetWSet(SDataFWriter *pWriter); // SDataFReader @@ -373,31 +368,32 @@ struct SBlockIdx { struct SMapData { int32_t nItem; - uint8_t flag; - uint8_t *pOfst; - uint32_t nData; + int32_t *aOffset; + int32_t nData; uint8_t *pData; - uint8_t *pBuf; }; typedef struct { int16_t cid; int8_t type; int8_t flag; // HAS_NONE|HAS_NULL|HAS_VALUE - int64_t offset; - int64_t bsize; // bitmap size - int64_t csize; // compressed column value size - int64_t osize; // original column value size (only save for variant data type) + int32_t offset; + int32_t szBitmap; // bitmap size + int32_t szOffset; // size of offset, only for variant-length data type + int32_t szValue; // compressed column value size + int32_t szOrigin; // original column value size (only save for variant data type) } SBlockCol; typedef struct { - int64_t nRow; - int8_t cmprAlg; - int64_t offset; - int64_t szVersion; // VERSION size - int64_t szTSKEY; // TSKEY size - int64_t szBlock; // total block size - SMapData mBlockCol; // SMapData + int32_t nRow; + int8_t cmprAlg; + int64_t offset; // block data offset + int32_t szBlockCol; // SBlockCol size + int32_t szVersion; // VERSION size + int32_t szTSKEY; // TSKEY size + int32_t szBlock; // total block size + int64_t sOffset; // sma offset + int32_t nSma; // sma size } SSubBlock; struct SBlock { @@ -425,7 +421,7 @@ struct SAggrBlkCol { struct SColData { int16_t cid; int8_t type; - int8_t offsetValid; + int8_t smaOn; int32_t nVal; uint8_t flag; uint8_t *pBitMap; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 51cc45d7f2fed4249217c85a6bc91fd80cea9871..6eea51a546c85b2e4250455ff3505c289fcef96c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -33,12 +33,10 @@ typedef struct { SDataFReader *pReader; SArray *aBlockIdx; // SArray SMapData oBlockMap; // SMapData, read from reader - SBlock oBlock; SBlockData oBlockData; SDataFWriter *pWriter; SArray *aBlockIdxN; // SArray SMapData nBlockMap; // SMapData - SBlock nBlock; SBlockData nBlockData; int64_t suid; int64_t uid; @@ -260,7 +258,6 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { // old taosArrayClear(pCommitter->aBlockIdx); tMapDataReset(&pCommitter->oBlockMap); - tBlockReset(&pCommitter->oBlock); tBlockDataReset(&pCommitter->oBlockData); pRSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, pCommitter->commitFid); if (pRSet) { @@ -274,7 +271,6 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { // new taosArrayClear(pCommitter->aBlockIdxN); tMapDataReset(&pCommitter->nBlockMap); - tBlockReset(&pCommitter->nBlock); tBlockDataReset(&pCommitter->nBlockData); if (pRSet) { wSet = (SDFileSet){.diskId = pRSet->diskId, @@ -351,11 +347,6 @@ static int32_t tsdbCommitBlockData(SCommitter *pCommitter, SBlockData *pBlockDat code = tsdbWriteBlockData(pCommitter->pWriter, pBlockData, NULL, NULL, pBlockIdx, pBlock, pCommitter->cmprAlg); if (code) goto _err; -#if 0 - code = tsdbWriteBlockSMA(pCommitter, pBlockData, pBlock); - if (code) goto _err; -#endif - code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock); if (code) goto _err; @@ -371,7 +362,8 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB SBlockIdx *pBlockIdx = &(SBlockIdx){.suid = pIter->pTbData->suid, .uid = pIter->pTbData->uid}; SBlockData *pBlockDataMerge = &pCommitter->oBlockData; SBlockData *pBlockData = &pCommitter->nBlockData; - SBlock *pBlock = &pCommitter->nBlock; + SBlock block; + SBlock *pBlock = █ TSDBROW *pRow1; TSDBROW row2; TSDBROW *pRow2 = &row2; @@ -469,7 +461,8 @@ _err: static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter, TSDBKEY toKey, int8_t toDataOnly) { int32_t code = 0; TSDBROW *pRow; - SBlock *pBlock = &pCommitter->nBlock; + SBlock block; + SBlock *pBlock = █ SBlockData *pBlockData = &pCommitter->nBlockData; int64_t suid = pIter->pTbData->suid; int64_t uid = pIter->pTbData->uid; @@ -519,13 +512,14 @@ _err: static int32_t tsdbCommitTableDiskData(SCommitter *pCommitter, SBlock *pBlock, SBlockIdx *pBlockIdx) { int32_t code = 0; + SBlock block; if (pBlock->last) { code = tsdbReadBlockData(pCommitter->pReader, pBlockIdx, pBlock, &pCommitter->oBlockData, NULL, NULL); if (code) goto _err; - tBlockReset(&pCommitter->nBlock); - code = tsdbCommitBlockData(pCommitter, &pCommitter->oBlockData, &pCommitter->nBlock, pBlockIdx, 0); + tBlockReset(&block); + code = tsdbCommitBlockData(pCommitter, &pCommitter->oBlockData, &block, pBlockIdx, 0); if (code) goto _err; } else { code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock); @@ -590,6 +584,7 @@ static int32_t tsdbMergeAsSubBlock(SCommitter *pCommitter, STbDataIter *pIter, S int32_t code = 0; SBlockData *pBlockData = &pCommitter->nBlockData; SBlockIdx *pBlockIdx = &(SBlockIdx){.suid = pIter->pTbData->suid, .uid = pIter->pTbData->uid}; + SBlock block; TSDBROW *pRow; tBlockDataReset(pBlockData); @@ -617,11 +612,8 @@ static int32_t tsdbMergeAsSubBlock(SCommitter *pCommitter, STbDataIter *pIter, S } } - // write as a subblock - code = tBlockCopy(pBlock, &pCommitter->nBlock); - if (code) goto _err; - - code = tsdbCommitBlockData(pCommitter, pBlockData, &pCommitter->nBlock, pBlockIdx, 0); + block = *pBlock; + code = tsdbCommitBlockData(pCommitter, pBlockData, &block, pBlockIdx, 0); if (code) goto _err; return code; @@ -670,7 +662,8 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl // start =========== tMapDataReset(&pCommitter->nBlockMap); - SBlock *pBlock = &pCommitter->oBlock; + SBlock block; + SBlock *pBlock = █ iBlock = 0; if (iBlock < nBlock) { @@ -895,6 +888,8 @@ static int32_t tsdbCommitFileData(SCommitter *pCommitter) { _err: tsdbError("vgId:%d commit file data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + tsdbDataFReaderClose(&pCommitter->pReader); + tsdbDataFWriterClose(&pCommitter->pWriter, 0); return code; } @@ -931,21 +926,23 @@ _err: static int32_t tsdbCommitDataStart(SCommitter *pCommitter) { int32_t code = 0; - pCommitter->pReader = NULL; pCommitter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); - pCommitter->oBlockMap = tMapDataInit(); - pCommitter->oBlock = tBlockInit(); - pCommitter->pWriter = NULL; + if (pCommitter->aBlockIdx == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + pCommitter->aBlockIdxN = taosArrayInit(0, sizeof(SBlockIdx)); - pCommitter->nBlockMap = tMapDataInit(); - pCommitter->nBlock = tBlockInit(); + if (pCommitter->aBlockIdxN == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + code = tBlockDataInit(&pCommitter->oBlockData); if (code) goto _exit; + code = tBlockDataInit(&pCommitter->nBlockData); - if (code) { - tBlockDataClear(&pCommitter->oBlockData); - goto _exit; - } + if (code) goto _exit; _exit: return code; @@ -953,13 +950,11 @@ _exit: static void tsdbCommitDataEnd(SCommitter *pCommitter) { taosArrayDestroy(pCommitter->aBlockIdx); - // tMapDataClear(&pCommitter->oBlockMap); - // tBlockClear(&pCommitter->oBlock); - // tBlockDataClear(&pCommitter->oBlockData); + tMapDataClear(&pCommitter->oBlockMap); + tBlockDataClear(&pCommitter->oBlockData); taosArrayDestroy(pCommitter->aBlockIdxN); - // tMapDataClear(&pCommitter->nBlockMap); - // tBlockClear(&pCommitter->nBlock); - // tBlockDataClear(&pCommitter->nBlockData); + tMapDataClear(&pCommitter->nBlockMap); + tBlockDataClear(&pCommitter->nBlockData); } static int32_t tsdbCommitData(SCommitter *pCommitter) { diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index a07597015269f5a9a8a2e335c2a9ab886d08d24c..e7360c3d1179222736c3a83718548b7d50561575 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -807,10 +807,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_ for (int32_t j = 0; j < mapData.nItem; ++j) { SBlock block = {0}; - int32_t code = tMapDataGetItemByIdx(&mapData, j, &block, tGetBlock); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + tMapDataGetItemByIdx(&mapData, j, &block, tGetBlock); // 1. time range check if (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) { diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 62ce35b6d89aba77039ab62222e79497f297cb38..c50de07a125a256b9c3c6466fe8f1c0f964cbca4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -489,6 +489,7 @@ _err: int32_t tsdbDataFReaderClose(SDataFReader **ppReader) { int32_t code = 0; + if (*ppReader == NULL) goto _exit; if (taosCloseFile(&(*ppReader)->pHeadFD) < 0) { code = TAOS_SYSTEM_ERROR(errno); @@ -511,6 +512,8 @@ int32_t tsdbDataFReaderClose(SDataFReader **ppReader) { } taosMemoryFree(*ppReader); + +_exit: *ppReader = NULL; return code; @@ -586,11 +589,14 @@ int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBl int32_t code = 0; int64_t offset = pBlockIdx->offset; int64_t size = pBlockIdx->size; + uint8_t *pBuf = NULL; int64_t n; + int64_t tn; SBlockDataHdr hdr; + if (!ppBuf) ppBuf = &pBuf; + // alloc - if (!ppBuf) ppBuf = &mBlock->pBuf; code = tsdbRealloc(ppBuf, size); if (code) goto _err; @@ -623,17 +629,24 @@ int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBl ASSERT(hdr.uid == pBlockIdx->uid); n = sizeof(hdr); - n += tGetMapData(*ppBuf + n, mBlock); + tn = tGetMapData(*ppBuf + n, mBlock); + if (tn < 0) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + n += tn; ASSERT(n + sizeof(TSCKSUM) == size); + tsdbFree(pBuf); return code; _err: tsdbError("vgId:%d read block failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); + tsdbFree(pBuf); return code; } -static int32_t tsdbRecoverBlockDataKey(SBlockData *pBlockData, SSubBlock *pSubBlock, uint8_t *pBuf, uint8_t **ppBuf) { +static int32_t tsdbReadBlockDataKey(SBlockData *pBlockData, SSubBlock *pSubBlock, uint8_t *pBuf, uint8_t **ppBuf) { int32_t code = 0; int64_t size = pSubBlock->szVersion + pSubBlock->szTSKEY + sizeof(TSCKSUM); int64_t n; @@ -656,7 +669,6 @@ static int32_t tsdbRecoverBlockDataKey(SBlockData *pBlockData, SSubBlock *pSubBl memcpy(pBlockData->aVersion, pBuf, pSubBlock->szVersion); // TSKEY - pBuf = pBuf + pSubBlock->szVersion; memcpy(pBlockData->aTSKEY, pBuf + pSubBlock->szVersion, pSubBlock->szTSKEY); } else { size = sizeof(int64_t) * pSubBlock->nRow + COMP_OVERFLOW_BYTES; @@ -674,9 +686,9 @@ static int32_t tsdbRecoverBlockDataKey(SBlockData *pBlockData, SSubBlock *pSubBl } // TSKEY - pBuf = pBuf + pSubBlock->szVersion; - n = tsDecompressTimestamp(pBuf, pSubBlock->szTSKEY, pSubBlock->nRow, (char *)pBlockData->aTSKEY, - sizeof(TSKEY) * pSubBlock->nRow, pSubBlock->cmprAlg, *ppBuf, size); + n = tsDecompressTimestamp(pBuf + pSubBlock->szVersion, pSubBlock->szTSKEY, pSubBlock->nRow, + (char *)pBlockData->aTSKEY, sizeof(TSKEY) * pSubBlock->nRow, pSubBlock->cmprAlg, *ppBuf, + size); if (n < 0) { code = TSDB_CODE_COMPRESS_ERROR; goto _err; @@ -689,15 +701,13 @@ _err: return code; } -static int32_t tsdbRecoverColData(SBlockData *pBlockData, SSubBlock *pSubBlock, SBlockCol *pBlockCol, - SColData *pColData, uint8_t *pBuf, uint8_t **ppBuf) { +static int32_t tsdbReadColDataImpl(SSubBlock *pSubBlock, SBlockCol *pBlockCol, SColData *pColData, uint8_t *pBuf, + uint8_t **ppBuf) { int32_t code = 0; int64_t size; int64_t n; - ASSERT(pBlockCol->flag != HAS_NULL); - - if (!taosCheckChecksumWhole(pBuf, pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM))) { + if (!taosCheckChecksumWhole(pBuf, pBlockCol->szBitmap + pBlockCol->szOffset + pBlockCol->szValue + sizeof(TSCKSUM))) { code = TSDB_CODE_FILE_CORRUPTED; goto _err; } @@ -705,40 +715,71 @@ static int32_t tsdbRecoverColData(SBlockData *pBlockData, SSubBlock *pSubBlock, pColData->nVal = pSubBlock->nRow; pColData->flag = pBlockCol->flag; - // bitmap + // BITMAP if (pBlockCol->flag != HAS_VALUE) { - size = BIT2_SIZE(pSubBlock->nRow); + ASSERT(pBlockCol->szBitmap); + + size = BIT2_SIZE(pColData->nVal); code = tsdbRealloc(&pColData->pBitMap, size); if (code) goto _err; - ASSERT(pBlockCol->bsize == size); + code = tsdbRealloc(ppBuf, size + COMP_OVERFLOW_BYTES); + if (code) goto _err; - memcpy(pColData->pBitMap, pBuf, size); + n = tsDecompressTinyint(pBuf, pBlockCol->szBitmap, size, pColData->pBitMap, size, TWO_STAGE_COMP, *ppBuf, + size + COMP_OVERFLOW_BYTES); + if (n <= 0) { + code = TSDB_CODE_COMPRESS_ERROR; + goto _err; + } + + ASSERT(n == size); } else { - ASSERT(pBlockCol->bsize == 0); + ASSERT(pBlockCol->szBitmap == 0); } - pBuf = pBuf + pBlockCol->bsize; + pBuf = pBuf + pBlockCol->szBitmap; + + // OFFSET + if (IS_VAR_DATA_TYPE(pColData->type)) { + ASSERT(pBlockCol->szOffset); - // value - if (IS_VAR_DATA_TYPE(pBlockCol->type)) { - pColData->nData = pBlockCol->osize; + size = sizeof(int32_t) * pColData->nVal; + code = tsdbRealloc((uint8_t **)&pColData->aOffset, size); + if (code) goto _err; + + code = tsdbRealloc(ppBuf, size + COMP_OVERFLOW_BYTES); + if (code) goto _err; + + n = tsDecompressInt(pBuf, pBlockCol->szOffset, pColData->nVal, (char *)pColData->aOffset, size, TWO_STAGE_COMP, + *ppBuf, size + COMP_OVERFLOW_BYTES); + if (n <= 0) { + code = TSDB_CODE_COMPRESS_ERROR; + goto _err; + } + + ASSERT(n == size); } else { - pColData->nData = tDataTypes[pBlockCol->type].bytes * pSubBlock->nRow; + ASSERT(pBlockCol->szOffset == 0); } + pBuf = pBuf + pBlockCol->szOffset; + + // VALUE + pColData->nData = pBlockCol->szOrigin; + code = tsdbRealloc(&pColData->pData, pColData->nData); if (code) goto _err; if (pSubBlock->cmprAlg == NO_COMPRESSION) { memcpy(pColData->pData, pBuf, pColData->nData); } else { - size = pColData->nData + COMP_OVERFLOW_BYTES; if (pSubBlock->cmprAlg == TWO_STAGE_COMP) { - code = tsdbRealloc(ppBuf, size); + code = tsdbRealloc(ppBuf, pColData->nData + COMP_OVERFLOW_BYTES); if (code) goto _err; } - n = tDataTypes[pBlockCol->type].decompFunc(pBuf, pBlockCol->csize, pSubBlock->nRow, pColData->pData, - pColData->nData, pSubBlock->cmprAlg, *ppBuf, size); + n = tDataTypes[pBlockCol->type].decompFunc(pBuf, pBlockCol->szValue, pSubBlock->nRow, pColData->pData, + pColData->nData, pSubBlock->cmprAlg, *ppBuf, + pColData->nData + COMP_OVERFLOW_BYTES); if (n < 0) { code = TSDB_CODE_COMPRESS_ERROR; goto _err; @@ -753,11 +794,41 @@ _err: return code; } -static int32_t tsdbReadColDataImpl(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int32_t iSubBlock, - int16_t *aColId, int32_t nCol, SBlockData *pBlockData, uint8_t **ppBuf1, - uint8_t **ppBuf2) { +static int32_t tsdbReadBlockCol(SSubBlock *pSubBlock, uint8_t *p, SArray *aBlockCol) { + int32_t code = 0; + int32_t n = 0; + SBlockCol blockCol; + SBlockCol *pBlockCol = &blockCol; + + if (!taosCheckChecksumWhole(p, pSubBlock->szBlockCol + sizeof(TSCKSUM))) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; + } + + n += sizeof(SBlockDataHdr); + while (n < pSubBlock->szBlockCol) { + n += tGetBlockCol(p + n, pBlockCol); + + if (taosArrayPush(aBlockCol, pBlockCol) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + } + + ASSERT(n == pSubBlock->szBlockCol); + + return code; + +_err: + return code; +} + +static int32_t tsdbReadSubColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int32_t iSubBlock, + int16_t *aColId, int32_t nCol, SBlockData *pBlockData, uint8_t **ppBuf1, + uint8_t **ppBuf2) { TdFilePtr pFD = pBlock->last ? pReader->pLastFD : pReader->pDataFD; SSubBlock *pSubBlock = &pBlock->aSubBlock[iSubBlock]; + SArray *aBlockCol = NULL; int32_t code = 0; int64_t offset; int64_t size; @@ -766,9 +837,15 @@ static int32_t tsdbReadColDataImpl(SDataFReader *pReader, SBlockIdx *pBlockIdx, tBlockDataReset(pBlockData); pBlockData->nRow = pSubBlock->nRow; - // TSDBKEY - offset = pSubBlock->offset + sizeof(SBlockDataHdr); - size = pSubBlock->szVersion + pSubBlock->szTSKEY + sizeof(TSCKSUM); + // TSDBKEY and SBlockCol + if (nCol == 1) { + offset = pSubBlock->offset + pSubBlock->szBlockCol + sizeof(TSCKSUM); + size = pSubBlock->szVersion + pSubBlock->szTSKEY + sizeof(TSCKSUM); + } else { + offset = pSubBlock->offset; + size = pSubBlock->szBlockCol + sizeof(TSCKSUM) + pSubBlock->szVersion + pSubBlock->szTSKEY + sizeof(TSCKSUM); + } + code = tsdbRealloc(ppBuf1, size); if (code) goto _err; @@ -787,31 +864,47 @@ static int32_t tsdbReadColDataImpl(SDataFReader *pReader, SBlockIdx *pBlockIdx, goto _err; } - code = tsdbRecoverBlockDataKey(pBlockData, pSubBlock, *ppBuf1, ppBuf2); - if (code) goto _err; + if (nCol == 1) { + code = tsdbReadBlockDataKey(pBlockData, pSubBlock, *ppBuf1, ppBuf2); + if (code) goto _err; + + goto _exit; + } else { + aBlockCol = taosArrayInit(0, sizeof(SBlockCol)); + if (aBlockCol == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + code = tsdbReadBlockCol(pSubBlock, *ppBuf1, aBlockCol); + if (code) goto _err; + + code = tsdbReadBlockDataKey(pBlockData, pSubBlock, *ppBuf1 + pSubBlock->szBlockCol + sizeof(TSCKSUM), ppBuf2); + if (code) goto _err; + } - // OTHER - SBlockCol blockCol; - SBlockCol *pBlockCol = &blockCol; - SColData *pColData; for (int32_t iCol = 1; iCol < nCol; iCol++) { - int16_t cid = aColId[iCol]; + void *p = taosArraySearch(aBlockCol, &(SBlockCol){.cid = aColId[iCol]}, tBlockColCmprFn, TD_EQ); + + if (p) { + SBlockCol *pBlockCol = (SBlockCol *)p; + SColData *pColData; + + ASSERT(pBlockCol->flag && pBlockCol->flag != HAS_NONE); - if (tMapDataSearch(&pSubBlock->mBlockCol, &(SBlockCol){.cid = cid}, tGetBlockCol, tBlockColCmprFn, pBlockCol) == - 0) { code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aColDataP), &pColData); if (code) goto _err; - tColDataReset(pColData, pBlockCol->cid, pBlockCol->type); + tColDataReset(pColData, pBlockCol->cid, pBlockCol->type, 0); if (pBlockCol->flag == HAS_NULL) { for (int32_t iRow = 0; iRow < pSubBlock->nRow; iRow++) { code = tColDataAppendValue(pColData, &COL_VAL_NULL(pBlockCol->cid, pBlockCol->type)); if (code) goto _err; } } else { - offset = pSubBlock->offset + sizeof(SBlockDataHdr) + pSubBlock->szVersion + pSubBlock->szTSKEY + - sizeof(TSCKSUM) + pBlockCol->offset; - size = pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM); + offset = pSubBlock->offset + pSubBlock->szBlockCol + sizeof(TSCKSUM) + pSubBlock->szVersion + + pSubBlock->szTSKEY + sizeof(TSCKSUM) + pBlockCol->offset; + size = pBlockCol->szBitmap + pBlockCol->szOffset + pBlockCol->szValue + sizeof(TSCKSUM); code = tsdbRealloc(ppBuf1, size); if (code) goto _err; @@ -833,14 +926,18 @@ static int32_t tsdbReadColDataImpl(SDataFReader *pReader, SBlockIdx *pBlockIdx, goto _err; } - code = tsdbRecoverColData(pBlockData, pSubBlock, pBlockCol, pColData, *ppBuf1, ppBuf2); + code = tsdbReadColDataImpl(pSubBlock, pBlockCol, pColData, *ppBuf1, ppBuf2); if (code) goto _err; } } } + +_exit: + taosArrayDestroy(aBlockCol); return code; _err: + taosArrayDestroy(aBlockCol); return code; } @@ -855,7 +952,7 @@ int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBl if (!ppBuf1) ppBuf1 = &pBuf1; if (!ppBuf2) ppBuf2 = &pBuf2; - code = tsdbReadColDataImpl(pReader, pBlockIdx, pBlock, 0, aColId, nCol, pBlockData, ppBuf1, ppBuf2); + code = tsdbReadSubColData(pReader, pBlockIdx, pBlock, 0, aColId, nCol, pBlockData, ppBuf1, ppBuf2); if (code) goto _err; if (pBlock->nSubBlock > 1) { @@ -863,7 +960,7 @@ int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBl SBlockData *pBlockData2 = &(SBlockData){0}; for (int32_t iSubBlock = 1; iSubBlock < pBlock->nSubBlock; iSubBlock++) { - code = tsdbReadColDataImpl(pReader, pBlockIdx, pBlock, iSubBlock, aColId, nCol, pBlockData1, ppBuf1, ppBuf2); + code = tsdbReadSubColData(pReader, pBlockIdx, pBlock, iSubBlock, aColId, nCol, pBlockData1, ppBuf1, ppBuf2); if (code) goto _err; code = tBlockDataCopy(pBlockData, pBlockData2); @@ -904,7 +1001,7 @@ static int32_t tsdbReadSubBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, int64_t n; TdFilePtr pFD = pBlock->last ? pReader->pLastFD : pReader->pDataFD; SSubBlock *pSubBlock = &pBlock->aSubBlock[iSubBlock]; - SBlockCol *pBlockCol = &(SBlockCol){0}; + SArray *aBlockCol = NULL; tBlockDataReset(pBlockData); @@ -929,41 +1026,52 @@ static int32_t tsdbReadSubBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, goto _err; } - // recover pBlockData->nRow = pSubBlock->nRow; - p = *ppBuf1 + sizeof(SBlockDataHdr); - code = tsdbRecoverBlockDataKey(pBlockData, pSubBlock, p, ppBuf2); + // TSDBKEY + p = *ppBuf1 + pSubBlock->szBlockCol + sizeof(TSCKSUM); + code = tsdbReadBlockDataKey(pBlockData, pSubBlock, p, ppBuf2); + if (code) goto _err; + + // COLUMNS + aBlockCol = taosArrayInit(0, sizeof(SBlockCol)); + if (aBlockCol == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + code = tsdbReadBlockCol(pSubBlock, *ppBuf1, aBlockCol); if (code) goto _err; - p = p + pSubBlock->szVersion + pSubBlock->szTSKEY + sizeof(TSCKSUM); - for (int32_t iBlockCol = 0; iBlockCol < pSubBlock->mBlockCol.nItem; iBlockCol++) { - SColData *pColData; + for (int32_t iBlockCol = 0; iBlockCol < taosArrayGetSize(aBlockCol); iBlockCol++) { + SColData *pColData; + SBlockCol *pBlockCol = (SBlockCol *)taosArrayGet(aBlockCol, iBlockCol); - tMapDataGetItemByIdx(&pSubBlock->mBlockCol, iBlockCol, pBlockCol, tGetBlockCol); ASSERT(pBlockCol->flag && pBlockCol->flag != HAS_NONE); code = tBlockDataAddColData(pBlockData, iBlockCol, &pColData); if (code) goto _err; - tColDataReset(pColData, pBlockCol->cid, pBlockCol->type); + tColDataReset(pColData, pBlockCol->cid, pBlockCol->type, 0); if (pBlockCol->flag == HAS_NULL) { for (int32_t iRow = 0; iRow < pSubBlock->nRow; iRow++) { code = tColDataAppendValue(pColData, &COL_VAL_NULL(pBlockCol->cid, pBlockCol->type)); if (code) goto _err; } } else { - code = tsdbRecoverColData(pBlockData, pSubBlock, pBlockCol, pColData, p, ppBuf2); + p = *ppBuf1 + pSubBlock->szBlockCol + sizeof(TSCKSUM) + pSubBlock->szVersion + pSubBlock->szTSKEY + + sizeof(TSCKSUM) + pBlockCol->offset; + code = tsdbReadColDataImpl(pSubBlock, pBlockCol, pColData, p, ppBuf2); if (code) goto _err; - - p = p + pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM); } } + taosArrayDestroy(aBlockCol); return code; _err: tsdbError("vgId:%d tsdb read sub block data failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); + taosArrayDestroy(aBlockCol); return code; } @@ -1189,6 +1297,8 @@ int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync) { int32_t code = 0; STsdb *pTsdb = (*ppWriter)->pTsdb; + if (*ppWriter == NULL) goto _exit; + if (sync) { if (taosFsyncFile((*ppWriter)->pHeadFD) < 0) { code = TAOS_SYSTEM_ERROR(errno); @@ -1232,6 +1342,7 @@ int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync) { } taosMemoryFree(*ppWriter); +_exit: *ppWriter = NULL; return code; @@ -1366,246 +1477,390 @@ _err: return code; } -int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2, - SBlockIdx *pBlockIdx, SBlock *pBlock, int8_t cmprAlg) { - int32_t code = 0; - SSubBlock *pSubBlock = &pBlock->aSubBlock[pBlock->nSubBlock++]; - SBlockCol *pBlockCol = &(SBlockCol){0}; - int64_t size; - int64_t n; - TdFilePtr pFileFD = pBlock->last ? pWriter->pLastFD : pWriter->pDataFD; - SBlockDataHdr hdr = {.delimiter = TSDB_FILE_DLMT, .suid = pBlockIdx->suid, .uid = pBlockIdx->uid}; - TSCKSUM cksm; - uint8_t *p; - int64_t offset; - uint8_t *pBuf1 = NULL; - uint8_t *pBuf2 = NULL; - - if (!ppBuf1) ppBuf1 = &pBuf1; - if (!ppBuf2) ppBuf2 = &pBuf2; - - TSKEY lastKey = TSKEY_MIN; +static void tsdbUpdateBlockInfo(SBlockData *pBlockData, SBlock *pBlock) { for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) { - TSDBKEY key = TSDBROW_KEY(&tsdbRowFromBlockData(pBlockData, iRow)); + TSDBKEY key = {.ts = pBlockData->aTSKEY[iRow], .version = pBlockData->aVersion[iRow]}; + if (iRow == 0) { - pBlock->minKey = MIN_TSDBKEY(pBlock->minKey, key); + if (tsdbKeyCmprFn(&pBlock->minKey, &key) > 0) { + pBlock->minKey = key; + } + } else { + if (pBlockData->aTSKEY[iRow] == pBlockData->aTSKEY[iRow - 1]) { + pBlock->hasDup = 1; + } } - if (iRow == pBlockData->nRow - 1) { - pBlock->maxKey = MAX_TSDBKEY(pBlock->maxKey, key); + if (iRow == pBlockData->nRow - 1 && tsdbKeyCmprFn(&pBlock->maxKey, &key) < 0) { + pBlock->maxKey = key; } pBlock->minVersion = TMIN(pBlock->minVersion, key.version); pBlock->maxVersion = TMAX(pBlock->maxVersion, key.version); - if (key.ts == lastKey) { - pBlock->hasDup = 1; - } - lastKey = key.ts; } pBlock->nRow += pBlockData->nRow; +} - pSubBlock->nRow = pBlockData->nRow; - pSubBlock->cmprAlg = cmprAlg; - if (pBlock->last) { - pSubBlock->offset = pWriter->wSet.fLast.size; +static int32_t tsdbWriteBlockDataKey(SSubBlock *pSubBlock, SBlockData *pBlockData, uint8_t **ppBuf1, int64_t *nDataP, + uint8_t **ppBuf2) { + int32_t code = 0; + int64_t size; + int64_t tsize; + + if (pSubBlock->cmprAlg == NO_COMPRESSION) { + pSubBlock->szVersion = sizeof(int64_t) * pSubBlock->nRow; + pSubBlock->szTSKEY = sizeof(TSKEY) * pSubBlock->nRow; + + code = tsdbRealloc(ppBuf1, *nDataP + pSubBlock->szVersion + pSubBlock->szTSKEY + sizeof(TSCKSUM)); + if (code) goto _err; + + // VERSION + memcpy(*ppBuf1 + *nDataP, pBlockData->aVersion, pSubBlock->szVersion); + + // TSKEY + memcpy(*ppBuf1 + *nDataP + pSubBlock->szVersion, pBlockData->aTSKEY, pSubBlock->szTSKEY); } else { - pSubBlock->offset = pWriter->wSet.fData.size; - } - pSubBlock->szBlock = 0; + size = (sizeof(int64_t) + sizeof(TSKEY)) * pSubBlock->nRow + COMP_OVERFLOW_BYTES * 2; - // HDR - n = taosWriteFile(pFileFD, &hdr, sizeof(hdr)); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - pSubBlock->szBlock += n; + code = tsdbRealloc(ppBuf1, *nDataP + size + sizeof(TSCKSUM)); + if (code) goto _err; - // TSDBKEY - if (cmprAlg == NO_COMPRESSION) { - cksm = 0; + tsize = sizeof(int64_t) * pSubBlock->nRow + COMP_OVERFLOW_BYTES; + if (pSubBlock->cmprAlg == TWO_STAGE_COMP) { + code = tsdbRealloc(ppBuf2, tsize); + if (code) goto _err; + } - // version - pSubBlock->szVersion = sizeof(int64_t) * pBlockData->nRow; - n = taosWriteFile(pFileFD, pBlockData->aVersion, pSubBlock->szVersion); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); + // VERSION + pSubBlock->szVersion = + tsCompressBigint((char *)pBlockData->aVersion, sizeof(int64_t) * pBlockData->nRow, pBlockData->nRow, + *ppBuf1 + *nDataP, size, pSubBlock->cmprAlg, *ppBuf2, tsize); + if (pSubBlock->szVersion <= 0) { + code = TSDB_CODE_COMPRESS_ERROR; goto _err; } - cksm = taosCalcChecksum(cksm, (uint8_t *)pBlockData->aVersion, pSubBlock->szVersion); // TSKEY - pSubBlock->szTSKEY = sizeof(TSKEY) * pBlockData->nRow; - n = taosWriteFile(pFileFD, pBlockData->aTSKEY, pSubBlock->szTSKEY); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); + pSubBlock->szTSKEY = tsCompressTimestamp((char *)pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->nRow, + pBlockData->nRow, *ppBuf1 + *nDataP + pSubBlock->szVersion, + size - pSubBlock->szVersion, pSubBlock->cmprAlg, *ppBuf2, tsize); + if (pSubBlock->szTSKEY <= 0) { + code = TSDB_CODE_COMPRESS_ERROR; goto _err; } - cksm = taosCalcChecksum(cksm, (uint8_t *)pBlockData->aTSKEY, pSubBlock->szTSKEY); - // cksm - size = sizeof(cksm); - n = taosWriteFile(pFileFD, (uint8_t *)&cksm, size); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); + ASSERT(pSubBlock->szVersion + pSubBlock->szTSKEY <= size); + } + + // checksum + size = pSubBlock->szVersion + pSubBlock->szTSKEY + sizeof(TSCKSUM); + taosCalcChecksumAppend(0, *ppBuf1 + *nDataP, size); + + *nDataP += size; + return code; + +_err: + return code; +} + +static int32_t tsdbWriteColData(SColData *pColData, SBlockCol *pBlockCol, SSubBlock *pSubBlock, uint8_t **ppBuf1, + int64_t *nDataP, uint8_t **ppBuf2) { + int32_t code = 0; + int64_t size; + int64_t n = 0; + + // BITMAP + if (pColData->flag != HAS_VALUE) { + size = BIT2_SIZE(pColData->nVal) + COMP_OVERFLOW_BYTES; + + code = tsdbRealloc(ppBuf1, *nDataP + n + size); + if (code) goto _err; + + code = tsdbRealloc(ppBuf2, size); + if (code) goto _err; + + pBlockCol->szBitmap = + tsCompressTinyint((char *)pColData->pBitMap, BIT2_SIZE(pColData->nVal), BIT2_SIZE(pColData->nVal), + *ppBuf1 + *nDataP + n, size, TWO_STAGE_COMP, *ppBuf2, size); + if (pBlockCol->szBitmap <= 0) { + code = TSDB_CODE_COMPRESS_ERROR; goto _err; } } else { - ASSERT(cmprAlg == ONE_STAGE_COMP || cmprAlg == TWO_STAGE_COMP); + pBlockCol->szBitmap = 0; + } + n += pBlockCol->szBitmap; - size = (sizeof(int64_t) + sizeof(TSKEY)) * pBlockData->nRow + COMP_OVERFLOW_BYTES * 2 + sizeof(TSCKSUM); + // OFFSET + if (IS_VAR_DATA_TYPE(pColData->type)) { + size = sizeof(int32_t) * pColData->nVal + COMP_OVERFLOW_BYTES; - code = tsdbRealloc(ppBuf1, size); + code = tsdbRealloc(ppBuf1, *nDataP + n + size); if (code) goto _err; - if (cmprAlg == TWO_STAGE_COMP) { - code = tsdbRealloc(ppBuf2, size); - if (code) goto _err; - } + code = tsdbRealloc(ppBuf2, size); + if (code) goto _err; - // version - n = tsCompressBigint((char *)pBlockData->aVersion, sizeof(int64_t) * pBlockData->nRow, pBlockData->nRow, *ppBuf1, - size, cmprAlg, *ppBuf2, size); - if (n <= 0) { + pBlockCol->szOffset = tsCompressInt((char *)pColData->aOffset, sizeof(int32_t) * pColData->nVal, pColData->nVal, + *ppBuf1 + *nDataP + n, size, TWO_STAGE_COMP, *ppBuf2, size); + if (pBlockCol->szOffset <= 0) { code = TSDB_CODE_COMPRESS_ERROR; goto _err; } - pSubBlock->szVersion = n; + } else { + pBlockCol->szOffset = 0; + } + n += pBlockCol->szOffset; - // TSKEY - n = tsCompressTimestamp((char *)pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->nRow, pBlockData->nRow, - *ppBuf1 + pSubBlock->szVersion, size - pSubBlock->szVersion, cmprAlg, *ppBuf2, size); - if (n <= 0) { + // VALUE + if (pSubBlock->cmprAlg == NO_COMPRESSION) { + pBlockCol->szValue = pColData->nData; + + code = tsdbRealloc(ppBuf1, *nDataP + n + pBlockCol->szValue + sizeof(TSCKSUM)); + if (code) goto _err; + + memcpy(*ppBuf1 + *nDataP + n, pColData->pData, pBlockCol->szValue); + } else { + size = pColData->nData + COMP_OVERFLOW_BYTES; + + code = tsdbRealloc(ppBuf1, *nDataP + n + size + sizeof(TSCKSUM)); + if (code) goto _err; + + if (pSubBlock->cmprAlg == TWO_STAGE_COMP) { + code = tsdbRealloc(ppBuf2, size); + if (code) goto _err; + } + + pBlockCol->szValue = + tDataTypes[pColData->type].compFunc((char *)pColData->pData, pColData->nData, pColData->nVal, + *ppBuf1 + *nDataP + n, size, pSubBlock->cmprAlg, *ppBuf2, size); + if (pBlockCol->szValue <= 0) { code = TSDB_CODE_COMPRESS_ERROR; goto _err; } - pSubBlock->szTSKEY = n; + } + n += pBlockCol->szValue; + pBlockCol->szOrigin = pColData->nData; - // cksm - n = pSubBlock->szVersion + pSubBlock->szTSKEY + sizeof(TSCKSUM); - ASSERT(n <= size); - taosCalcChecksumAppend(0, *ppBuf1, n); + // checksum + n += sizeof(TSCKSUM); + taosCalcChecksumAppend(0, *ppBuf1 + *nDataP, n); - // write - n = taosWriteFile(pFileFD, *ppBuf1, n); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; + *nDataP += n; + + return code; + +_err: + return code; +} + +static int32_t tsdbWriteBlockDataImpl(TdFilePtr pFD, SSubBlock *pSubBlock, SBlockDataHdr hdr, SArray *aBlockCol, + uint8_t *pData, int64_t nData, uint8_t **ppBuf) { + int32_t code = 0; + int32_t nBlockCol = taosArrayGetSize(aBlockCol); + int64_t size; + int64_t n; + + // HDR + SArray + pSubBlock->szBlockCol = sizeof(hdr); + for (int32_t iBlockCol = 0; iBlockCol < nBlockCol; iBlockCol++) { + pSubBlock->szBlockCol += tPutBlockCol(NULL, taosArrayGet(aBlockCol, iBlockCol)); + } + + code = tsdbRealloc(ppBuf, pSubBlock->szBlock + sizeof(TSCKSUM)); + if (code) goto _err; + + n = 0; + memcpy(*ppBuf, &hdr, sizeof(hdr)); + n += sizeof(hdr); + for (int32_t iBlockCol = 0; iBlockCol < nBlockCol; iBlockCol++) { + n += tPutBlockCol(*ppBuf + n, taosArrayGet(aBlockCol, iBlockCol)); + } + taosCalcChecksumAppend(0, *ppBuf, pSubBlock->szBlockCol + sizeof(TSCKSUM)); + + ASSERT(n == pSubBlock->szBlockCol); + + n = taosWriteFile(pFD, *ppBuf, pSubBlock->szBlockCol + sizeof(TSCKSUM)); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + // SBlockData + n = taosWriteFile(pFD, pData, nData); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + return code; + +_err: + return code; +} + +static void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) { + SColVal colVal; + SColVal *pColVal = &colVal; + + *pColAgg = (SColumnDataAgg){.colId = pColData->cid}; + for (int32_t iVal = 0; iVal < pColData->nVal; iVal++) { + tColDataGetValue(pColData, iVal, pColVal); + + if (pColVal->isNone || pColVal->isNull) { + pColAgg->numOfNull++; + } else { + // TODO: + ASSERT(0); } } - pSubBlock->szBlock += (pSubBlock->szVersion + pSubBlock->szTSKEY + sizeof(TSCKSUM)); +} - // other columns - offset = 0; - tMapDataReset(&pSubBlock->mBlockCol); - for (int32_t iCol = 0; iCol < taosArrayGetSize(pBlockData->aColDataP); iCol++) { - SColData *pColData = (SColData *)taosArrayGetP(pBlockData->aColDataP, iCol); +static int32_t tsdbWriteBlockSMA(TdFilePtr pFD, SBlockData *pBlockData, SSubBlock *pSubBlock, uint8_t **ppBuf) { + int32_t code = 0; + int64_t n; + SColData *pColData; - ASSERT(pColData->flag); + // prepare + pSubBlock->nSma = 0; + for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aColDataP); iColData++) { + pColData = (SColData *)taosArrayGetP(pBlockData->aColDataP, iColData); - if (pColData->flag == HAS_NONE) continue; + if (IS_VAR_DATA_TYPE(pColData->type) || (!pColData->smaOn)) continue; - pBlockCol->cid = pColData->cid; - pBlockCol->type = pColData->type; - pBlockCol->flag = pColData->flag; + pSubBlock->nSma++; + } + if (pSubBlock->nSma == 0) goto _exit; - if (pColData->flag != HAS_NULL) { - cksm = 0; - pBlockCol->offset = offset; + // calc + code = tsdbRealloc(ppBuf, sizeof(SColumnDataAgg) * pSubBlock->nSma + sizeof(TSCKSUM)); + if (code) goto _err; + n = 0; + for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aColDataP); iColData++) { + pColData = (SColData *)taosArrayGetP(pBlockData->aColDataP, iColData); - // bitmap - if (pColData->flag == HAS_VALUE) { - pBlockCol->bsize = 0; - } else { - pBlockCol->bsize = BIT2_SIZE(pBlockData->nRow); - n = taosWriteFile(pFileFD, pColData->pBitMap, pBlockCol->bsize); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - cksm = taosCalcChecksum(cksm, pColData->pBitMap, n); - } + if (IS_VAR_DATA_TYPE(pColData->type) || (!pColData->smaOn)) continue; - // data - if (cmprAlg == NO_COMPRESSION) { - // data - n = taosWriteFile(pFileFD, pColData->pData, pColData->nData); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - pBlockCol->csize = n; - pBlockCol->osize = n; + tsdbCalcColDataSMA(pColData, &((SColumnDataAgg *)(*ppBuf))[n]); + n++; + } + taosCalcChecksumAppend(0, *ppBuf, sizeof(SColumnDataAgg) * pSubBlock->nSma + sizeof(TSCKSUM)); - // checksum - cksm = taosCalcChecksum(cksm, pColData->pData, pColData->nData); - n = taosWriteFile(pFileFD, &cksm, sizeof(cksm)); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - } else { - size = pColData->nData + COMP_OVERFLOW_BYTES + sizeof(TSCKSUM); + // write + n = taosWriteFile(pFD, *ppBuf, sizeof(SColumnDataAgg) * pSubBlock->nSma + sizeof(TSCKSUM)); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } - code = tsdbRealloc(ppBuf1, size); - if (code) goto _err; +_exit: + return code; - if (cmprAlg == TWO_STAGE_COMP) { - code = tsdbRealloc(ppBuf2, size); - if (code) goto _err; - } +_err: + return code; +} - // data - n = tDataTypes[pColData->type].compFunc(pColData->pData, pColData->nData, pBlockData->nRow, *ppBuf1, size, - cmprAlg, *ppBuf2, size); - if (n <= 0) { - code = TSDB_CODE_COMPRESS_ERROR; - goto _err; - } - pBlockCol->csize = n; - pBlockCol->osize = pColData->nData; +int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2, + SBlockIdx *pBlockIdx, SBlock *pBlock, int8_t cmprAlg) { + int32_t code = 0; + SSubBlock *pSubBlock = &pBlock->aSubBlock[pBlock->nSubBlock++]; + SBlockCol blockCol; + SBlockCol *pBlockCol = &blockCol; + int64_t n; + TdFilePtr pFileFD = pBlock->last ? pWriter->pLastFD : pWriter->pDataFD; + SBlockDataHdr hdr = {.delimiter = TSDB_FILE_DLMT, .suid = pBlockIdx->suid, .uid = pBlockIdx->uid}; + uint8_t *p; + int64_t nData; + uint8_t *pBuf1 = NULL; + uint8_t *pBuf2 = NULL; + SArray *aBlockCol = NULL; - // cksm - n += sizeof(TSCKSUM); - ASSERT(n <= size); - taosCalcChecksumAppend(cksm, *ppBuf1, n); + if (!ppBuf1) ppBuf1 = &pBuf1; + if (!ppBuf2) ppBuf2 = &pBuf2; - // write - n = taosWriteFile(pFileFD, *ppBuf1, n); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - } + tsdbUpdateBlockInfo(pBlockData, pBlock); - // state - offset = offset + pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM); - pSubBlock->szBlock = pSubBlock->szBlock + pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM); + pSubBlock->nRow = pBlockData->nRow; + pSubBlock->cmprAlg = cmprAlg; + if (pBlock->last) { + pSubBlock->offset = pWriter->wSet.fLast.size; + } else { + pSubBlock->offset = pWriter->wSet.fData.size; + } + + // ======================= BLOCK DATA ======================= + // TSDBKEY + nData = 0; + code = tsdbWriteBlockDataKey(pSubBlock, pBlockData, ppBuf1, &nData, ppBuf2); + if (code) goto _err; + + // COLUMNS + aBlockCol = taosArrayInit(taosArrayGetSize(pBlockData->aColDataP), sizeof(SBlockCol)); + if (aBlockCol == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + int32_t offset = 0; + for (int32_t iCol = 0; iCol < taosArrayGetSize(pBlockData->aColDataP); iCol++) { + SColData *pColData = (SColData *)taosArrayGetP(pBlockData->aColDataP, iCol); + + ASSERT(pColData->flag); + + if (pColData->flag == HAS_NONE) continue; + + pBlockCol->cid = pColData->cid; + pBlockCol->type = pColData->type; + pBlockCol->flag = pColData->flag; + + if (pColData->flag != HAS_NULL) { + code = tsdbWriteColData(pColData, pBlockCol, pSubBlock, ppBuf1, &nData, ppBuf2); + if (code) goto _err; + + pBlockCol->offset = offset; + offset = offset + pBlockCol->szBitmap + pBlockCol->szOffset + pBlockCol->szValue + sizeof(TSCKSUM); } - code = tMapDataPutItem(&pSubBlock->mBlockCol, pBlockCol, tPutBlockCol); - if (code) goto _err; + if (taosArrayPush(aBlockCol, pBlockCol) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } } + // write + code = tsdbWriteBlockDataImpl(pFileFD, pSubBlock, hdr, aBlockCol, *ppBuf1, nData, ppBuf2); + if (code) goto _err; + + pSubBlock->szBlock = pSubBlock->szBlockCol + sizeof(TSCKSUM) + nData; if (pBlock->last) { pWriter->wSet.fLast.size += pSubBlock->szBlock; } else { pWriter->wSet.fData.size += pSubBlock->szBlock; } + // ======================= BLOCK SMA ======================= + pSubBlock->sOffset = 0; + pSubBlock->nSma = 0; + + if (pBlock->nSubBlock > 1 || pBlock->last || pBlock->hasDup) goto _exit; + + code = tsdbWriteBlockSMA(pWriter->pSmaFD, pBlockData, pSubBlock, ppBuf1); + if (code) goto _err; + + if (pSubBlock->nSma > 0) { + pSubBlock->sOffset = pWriter->wSet.fSma.size; + pWriter->wSet.fSma.size += (sizeof(SColumnDataAgg) * pSubBlock->nSma + sizeof(TSCKSUM)); + } + +_exit: tsdbFree(pBuf1); tsdbFree(pBuf2); + taosArrayDestroy(aBlockCol); return code; _err: tsdbError("vgId:%d write block data failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); tsdbFree(pBuf1); tsdbFree(pBuf2); - return code; -} - -int32_t tsdbWriteBlockSMA(SDataFWriter *pWriter, SBlockSMA *pBlockSMA, int64_t *rOffset, int64_t *rSize) { - int32_t code = 0; - // TODO + taosArrayDestroy(aBlockCol); return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index aecf4b65874b1adf01169ae685fed946f8cdbb3e..5d934adf8ea43b2f7ed07fa575a9083ed1d4fe16 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -15,57 +15,15 @@ #include "tsdb.h" -#define TSDB_OFFSET_I32 ((uint8_t)0) -#define TSDB_OFFSET_I16 ((uint8_t)1) -#define TSDB_OFFSET_I8 ((uint8_t)2) - // SMapData ======================================================================= void tMapDataReset(SMapData *pMapData) { - pMapData->flag = TSDB_OFFSET_I32; pMapData->nItem = 0; pMapData->nData = 0; } void tMapDataClear(SMapData *pMapData) { - if (pMapData->pBuf) { - tsdbFree(pMapData->pBuf); - } else { - tsdbFree(pMapData->pOfst); - tsdbFree(pMapData->pData); - } -} - -int32_t tMapDataCopy(SMapData *pMapDataSrc, SMapData *pMapDataDest) { - int32_t code = 0; - int32_t size; - - pMapDataDest->nItem = pMapDataSrc->nItem; - pMapDataDest->flag = pMapDataSrc->flag; - - switch (pMapDataDest->flag) { - case TSDB_OFFSET_I32: - size = sizeof(int32_t) * pMapDataDest->nItem; - break; - case TSDB_OFFSET_I16: - size = sizeof(int16_t) * pMapDataDest->nItem; - break; - case TSDB_OFFSET_I8: - size = sizeof(int8_t) * pMapDataDest->nItem; - break; - default: - ASSERT(0); - } - code = tsdbRealloc(&pMapDataDest->pOfst, size); - if (code) goto _exit; - memcpy(pMapDataDest->pOfst, pMapDataSrc->pOfst, size); - - pMapDataDest->nData = pMapDataSrc->nData; - code = tsdbRealloc(&pMapDataDest->pData, pMapDataDest->nData); - if (code) goto _exit; - memcpy(pMapDataDest->pData, pMapDataSrc->pData, pMapDataDest->nData); - -_exit: - return code; + tsdbFree((uint8_t *)pMapData->aOffset); + tsdbFree(pMapData->pData); } int32_t tMapDataPutItem(SMapData *pMapData, void *pItem, int32_t (*tPutItemFn)(uint8_t *, void *)) { @@ -77,35 +35,19 @@ int32_t tMapDataPutItem(SMapData *pMapData, void *pItem, int32_t (*tPutItemFn)(u pMapData->nData += tPutItemFn(NULL, pItem); // alloc - code = tsdbRealloc(&pMapData->pOfst, sizeof(int32_t) * pMapData->nItem); + code = tsdbRealloc((uint8_t **)&pMapData->aOffset, sizeof(int32_t) * pMapData->nItem); if (code) goto _err; code = tsdbRealloc(&pMapData->pData, pMapData->nData); if (code) goto _err; // put - ASSERT(pMapData->flag == TSDB_OFFSET_I32); - ((int32_t *)pMapData->pOfst)[nItem] = offset; + pMapData->aOffset[nItem] = offset; tPutItemFn(pMapData->pData + offset, pItem); _err: return code; } -static FORCE_INLINE int32_t tMapDataGetOffset(SMapData *pMapData, int32_t idx) { - switch (pMapData->flag) { - case TSDB_OFFSET_I8: - return ((int8_t *)pMapData->pOfst)[idx]; - break; - case TSDB_OFFSET_I16: - return ((int16_t *)pMapData->pOfst)[idx]; - break; - case TSDB_OFFSET_I32: - return ((int32_t *)pMapData->pOfst)[idx]; - break; - default: - ASSERT(0); - } -} int32_t tMapDataSearch(SMapData *pMapData, void *pSearchItem, int32_t (*tGetItemFn)(uint8_t *, void *), int32_t (*tItemCmprFn)(const void *, const void *), void *pItem) { int32_t code = 0; @@ -135,58 +77,25 @@ _exit: return code; } -int32_t tMapDataGetItemByIdx(SMapData *pMapData, int32_t idx, void *pItem, int32_t (*tGetItemFn)(uint8_t *, void *)) { - int32_t code = 0; - - if (idx < 0 || idx >= pMapData->nItem) { - code = TSDB_CODE_NOT_FOUND; - goto _exit; - } - - tGetItemFn(pMapData->pData + tMapDataGetOffset(pMapData, idx), pItem); - -_exit: - return code; +void tMapDataGetItemByIdx(SMapData *pMapData, int32_t idx, void *pItem, int32_t (*tGetItemFn)(uint8_t *, void *)) { + ASSERT(idx >= 0 && idx < pMapData->nItem); + tGetItemFn(pMapData->pData + pMapData->aOffset[idx], pItem); } int32_t tPutMapData(uint8_t *p, SMapData *pMapData) { int32_t n = 0; - ASSERT(pMapData->flag == TSDB_OFFSET_I32); - n += tPutI32v(p ? p + n : p, pMapData->nItem); if (pMapData->nItem) { - int32_t maxOffset = tMapDataGetOffset(pMapData, pMapData->nItem - 1); - - if (maxOffset <= INT8_MAX) { - n += tPutU8(p ? p + n : p, TSDB_OFFSET_I8); - if (p) { - for (int32_t iItem = 0; iItem < pMapData->nItem; iItem++) { - n += tPutI8(p + n, (int8_t)tMapDataGetOffset(pMapData, iItem)); - } - } else { - n = n + sizeof(int8_t) * pMapData->nItem; - } - } else if (maxOffset <= INT16_MAX) { - n += tPutU8(p ? p + n : p, TSDB_OFFSET_I16); - if (p) { - for (int32_t iItem = 0; iItem < pMapData->nItem; iItem++) { - n += tPutI16(p + n, (int16_t)tMapDataGetOffset(pMapData, iItem)); - } - } else { - n = n + sizeof(int16_t) * pMapData->nItem; - } - } else { - n += tPutU8(p ? p + n : p, TSDB_OFFSET_I32); - if (p) { - for (int32_t iItem = 0; iItem < pMapData->nItem; iItem++) { - n += tPutI32(p + n, tMapDataGetOffset(pMapData, iItem)); - } - } else { - n = n + sizeof(int32_t) * pMapData->nItem; - } + for (int32_t iItem = 0; iItem < pMapData->nItem; iItem++) { + n += tPutI32v(p ? p + n : p, pMapData->aOffset[iItem]); } - n += tPutBinary(p ? p + n : p, pMapData->pData, pMapData->nData); + + n += tPutI32v(p ? p + n : p, pMapData->nData); + if (p) { + memcpy(p + n, pMapData->pData, pMapData->nData); + } + n += pMapData->nData; } return n; @@ -194,26 +103,22 @@ int32_t tPutMapData(uint8_t *p, SMapData *pMapData) { int32_t tGetMapData(uint8_t *p, SMapData *pMapData) { int32_t n = 0; + int32_t offset; + + tMapDataReset(pMapData); n += tGetI32v(p + n, &pMapData->nItem); if (pMapData->nItem) { - n += tGetU8(p + n, &pMapData->flag); - pMapData->pOfst = p + n; - switch (pMapData->flag) { - case TSDB_OFFSET_I8: - n = n + sizeof(int8_t) * pMapData->nItem; - break; - case TSDB_OFFSET_I16: - n = n + sizeof(int16_t) * pMapData->nItem; - break; - case TSDB_OFFSET_I32: - n = n + sizeof(int32_t) * pMapData->nItem; - break; - - default: - ASSERT(0); + if (tsdbRealloc((uint8_t **)&pMapData->aOffset, sizeof(int32_t) * pMapData->nItem)) return -1; + + for (int32_t iItem = 0; iItem < pMapData->nItem; iItem++) { + n += tGetI32v(p + n, &pMapData->aOffset[iItem]); } - n += tGetBinary(p + n, &pMapData->pData, &pMapData->nData); + + n += tGetI32v(p + n, &pMapData->nData); + if (tsdbRealloc(&pMapData->pData, pMapData->nData)) return -1; + memcpy(pMapData->pData, p + n, pMapData->nData); + n += pMapData->nData; } return n; @@ -377,55 +282,8 @@ int32_t tCmprBlockIdx(void const *lhs, void const *rhs) { // SBlock ====================================================== void tBlockReset(SBlock *pBlock) { - pBlock->minKey = TSDBKEY_MAX; - pBlock->maxKey = TSDBKEY_MIN; - pBlock->minVersion = VERSION_MAX; - pBlock->maxVersion = VERSION_MIN; - pBlock->nRow = 0; - pBlock->last = -1; - pBlock->hasDup = 0; - for (int8_t iSubBlock = 0; iSubBlock < TSDB_MAX_SUBBLOCKS; iSubBlock++) { - pBlock->aSubBlock[iSubBlock].nRow = 0; - pBlock->aSubBlock[iSubBlock].cmprAlg = -1; - pBlock->aSubBlock[iSubBlock].offset = -1; - pBlock->aSubBlock[iSubBlock].szVersion = -1; - pBlock->aSubBlock[iSubBlock].szTSKEY = -1; - pBlock->aSubBlock[iSubBlock].szBlock = -1; - tMapDataReset(&pBlock->aSubBlock->mBlockCol); - } - pBlock->nSubBlock = 0; -} - -void tBlockClear(SBlock *pBlock) { - for (int8_t iSubBlock = 0; iSubBlock < TSDB_MAX_SUBBLOCKS; iSubBlock++) { - tMapDataClear(&pBlock->aSubBlock->mBlockCol); - } -} - -int32_t tBlockCopy(SBlock *pBlockSrc, SBlock *pBlockDest) { - int32_t code = 0; - - pBlockDest->minKey = pBlockSrc->minKey; - pBlockDest->maxKey = pBlockSrc->maxKey; - pBlockDest->minVersion = pBlockSrc->minVersion; - pBlockDest->maxVersion = pBlockSrc->maxVersion; - pBlockDest->nRow = pBlockSrc->nRow; - pBlockDest->last = pBlockSrc->last; - pBlockDest->hasDup = pBlockSrc->hasDup; - pBlockDest->nSubBlock = pBlockSrc->nSubBlock; - for (int32_t iSubBlock = 0; iSubBlock < pBlockSrc->nSubBlock; iSubBlock++) { - pBlockDest->aSubBlock[iSubBlock].nRow = pBlockSrc->aSubBlock[iSubBlock].nRow; - pBlockDest->aSubBlock[iSubBlock].cmprAlg = pBlockSrc->aSubBlock[iSubBlock].cmprAlg; - pBlockDest->aSubBlock[iSubBlock].offset = pBlockSrc->aSubBlock[iSubBlock].offset; - pBlockDest->aSubBlock[iSubBlock].szVersion = pBlockSrc->aSubBlock[iSubBlock].szVersion; - pBlockDest->aSubBlock[iSubBlock].szTSKEY = pBlockSrc->aSubBlock[iSubBlock].szTSKEY; - pBlockDest->aSubBlock[iSubBlock].szBlock = pBlockSrc->aSubBlock[iSubBlock].szBlock; - code = tMapDataCopy(&pBlockSrc->aSubBlock[iSubBlock].mBlockCol, &pBlockDest->aSubBlock[iSubBlock].mBlockCol); - if (code) goto _exit; - } - -_exit: - return code; + *pBlock = + (SBlock){.minKey = TSDBKEY_MAX, .maxKey = TSDBKEY_MIN, .minVersion = VERSION_MAX, .maxVersion = VERSION_MIN}; } int32_t tPutBlock(uint8_t *p, void *ph) { @@ -441,13 +299,15 @@ int32_t tPutBlock(uint8_t *p, void *ph) { n += tPutI8(p ? p + n : p, pBlock->hasDup); n += tPutI8(p ? p + n : p, pBlock->nSubBlock); for (int8_t iSubBlock = 0; iSubBlock < pBlock->nSubBlock; iSubBlock++) { - n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].nRow); + n += tPutI32v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].nRow); n += tPutI8(p ? p + n : p, pBlock->aSubBlock[iSubBlock].cmprAlg); n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].offset); - n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].szVersion); - n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].szTSKEY); - n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].szBlock); - n += tPutMapData(p ? p + n : p, &pBlock->aSubBlock[iSubBlock].mBlockCol); + n += tPutI32v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].szBlockCol); + n += tPutI32v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].szVersion); + n += tPutI32v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].szTSKEY); + n += tPutI32v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].szBlock); + n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].sOffset); + n += tPutI32v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].nSma); } return n; @@ -466,20 +326,21 @@ int32_t tGetBlock(uint8_t *p, void *ph) { n += tGetI8(p + n, &pBlock->hasDup); n += tGetI8(p + n, &pBlock->nSubBlock); for (int8_t iSubBlock = 0; iSubBlock < pBlock->nSubBlock; iSubBlock++) { - n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].nRow); + n += tGetI32v(p + n, &pBlock->aSubBlock[iSubBlock].nRow); n += tGetI8(p + n, &pBlock->aSubBlock[iSubBlock].cmprAlg); n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].offset); - n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].szVersion); - n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].szTSKEY); - n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].szBlock); - n += tGetMapData(p + n, &pBlock->aSubBlock[iSubBlock].mBlockCol); + n += tGetI32v(p + n, &pBlock->aSubBlock[iSubBlock].szBlockCol); + n += tGetI32v(p + n, &pBlock->aSubBlock[iSubBlock].szVersion); + n += tGetI32v(p + n, &pBlock->aSubBlock[iSubBlock].szTSKEY); + n += tGetI32v(p + n, &pBlock->aSubBlock[iSubBlock].szBlock); + n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].sOffset); + n += tGetI32v(p + n, &pBlock->aSubBlock[iSubBlock].nSma); } return n; } int32_t tBlockCmprFn(const void *p1, const void *p2) { - int32_t c; SBlock *pBlock1 = (SBlock *)p1; SBlock *pBlock2 = (SBlock *)p2; @@ -504,14 +365,11 @@ int32_t tPutBlockCol(uint8_t *p, void *ph) { n += tPutI8(p ? p + n : p, pBlockCol->flag); if (pBlockCol->flag != HAS_NULL) { - n += tPutI64v(p ? p + n : p, pBlockCol->offset); - if (pBlockCol->flag != HAS_VALUE) { - n += tPutI64v(p ? p + n : p, pBlockCol->bsize); - } - n += tPutI64v(p ? p + n : p, pBlockCol->csize); - if (IS_VAR_DATA_TYPE(pBlockCol->type)) { - n += tPutI64v(p ? p + n : p, pBlockCol->osize); - } + n += tPutI32v(p ? p + n : p, pBlockCol->offset); + n += tPutI32v(p ? p + n : p, pBlockCol->szBitmap); + n += tPutI32v(p ? p + n : p, pBlockCol->szOffset); + n += tPutI32v(p ? p + n : p, pBlockCol->szValue); + n += tPutI32v(p ? p + n : p, pBlockCol->szOrigin); } return n; @@ -528,18 +386,11 @@ int32_t tGetBlockCol(uint8_t *p, void *ph) { ASSERT(pBlockCol->flag && (pBlockCol->flag != HAS_NONE)); if (pBlockCol->flag != HAS_NULL) { - n += tGetI64v(p + n, &pBlockCol->offset); - if (pBlockCol->flag != HAS_VALUE) { - n += tGetI64v(p + n, &pBlockCol->bsize); - } else { - pBlockCol->bsize = 0; - } - n += tGetI64v(p + n, &pBlockCol->csize); - if (IS_VAR_DATA_TYPE(pBlockCol->type)) { - n += tGetI64v(p + n, &pBlockCol->osize); - } else { - pBlockCol->osize = -1; - } + n += tGetI32v(p + n, &pBlockCol->offset); + n += tGetI32v(p + n, &pBlockCol->szBitmap); + n += tGetI32v(p + n, &pBlockCol->szOffset); + n += tGetI32v(p + n, &pBlockCol->szValue); + n += tGetI32v(p + n, &pBlockCol->szOrigin); } return n; @@ -942,12 +793,12 @@ int32_t tGetKEYINFO(uint8_t *p, KEYINFO *pKeyInfo) { } // SColData ======================================== -void tColDataReset(SColData *pColData, int16_t cid, int8_t type) { +void tColDataReset(SColData *pColData, int16_t cid, int8_t type, int8_t smaOn) { pColData->cid = cid; pColData->type = type; + pColData->smaOn = smaOn; pColData->nVal = 0; pColData->flag = 0; - pColData->offsetValid = 0; pColData->nData = 0; } @@ -977,26 +828,35 @@ int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal) { if (pColVal->isNone) { pColData->flag |= HAS_NONE; SET_BIT2(pColData->pBitMap, pColData->nVal, 0); - if (IS_VAR_DATA_TYPE(pColData->type)) pValue = NULL; } else if (pColVal->isNull) { pColData->flag |= HAS_NULL; SET_BIT2(pColData->pBitMap, pColData->nVal, 1); - if (IS_VAR_DATA_TYPE(pColData->type)) pValue = NULL; } else { pColData->flag |= HAS_VALUE; SET_BIT2(pColData->pBitMap, pColData->nVal, 2); pValue = &pColVal->value; } - if (pValue) { - code = tsdbRealloc(&pColData->pData, pColData->nData + tPutValue(NULL, &pColVal->value, pColVal->type)); + if (IS_VAR_DATA_TYPE(pColData->type)) { + // offset + code = tsdbRealloc((uint8_t **)&pColData->aOffset, sizeof(int32_t) * (pColData->nVal + 1)); if (code) goto _exit; + pColData->aOffset[pColData->nVal] = pColData->nData; - pColData->nData += tPutValue(pColData->pData + pColData->nData, &pColVal->value, pColVal->type); + // value + if ((!pColVal->isNone) && (!pColVal->isNull)) { + code = tsdbRealloc(&pColData->pData, pColData->nData + pColVal->value.nData); + if (code) goto _exit; + memcpy(pColData->pData + pColData->nData, pColVal->value.pData, pColVal->value.nData); + pColData->nData += pColVal->value.nData; + } + } else { + code = tsdbRealloc(&pColData->pData, pColData->nData + tPutValue(NULL, pValue, pColVal->type)); + if (code) goto _exit; + pColData->nData += tPutValue(pColData->pData + pColData->nData, pValue, pColVal->type); } pColData->nVal++; - pColData->offsetValid = 0; _exit: return code; @@ -1004,57 +864,33 @@ _exit: int32_t tColDataCopy(SColData *pColDataSrc, SColData *pColDataDest) { int32_t code = 0; + int32_t size; - pColDataDest->cid = pColDataDest->cid; - pColDataDest->type = pColDataDest->type; - pColDataDest->offsetValid = 0; + pColDataDest->cid = pColDataSrc->cid; + pColDataDest->type = pColDataSrc->type; + pColDataDest->smaOn = pColDataSrc->smaOn; pColDataDest->nVal = pColDataSrc->nVal; pColDataDest->flag = pColDataSrc->flag; - if (pColDataSrc->flag != HAS_NONE && pColDataSrc->flag != HAS_NULL && pColDataSrc->flag != HAS_VALUE) { - code = tsdbRealloc(&pColDataDest->pBitMap, BIT2_SIZE(pColDataDest->nVal)); - if (code) goto _exit; - memcpy(pColDataDest->pBitMap, pColDataSrc->pBitMap, BIT2_SIZE(pColDataSrc->nVal)); - } - pColDataDest->nData = pColDataSrc->nData; - code = tsdbRealloc(&pColDataDest->pData, pColDataSrc->nData); + size = BIT2_SIZE(pColDataSrc->nVal); + code = tsdbRealloc(&pColDataDest->pBitMap, size); if (code) goto _exit; - memcpy(pColDataDest->pData, pColDataSrc->pData, pColDataSrc->nData); + memcpy(pColDataDest->pBitMap, pColDataSrc->pBitMap, size); -_exit: - return code; -} + if (IS_VAR_DATA_TYPE(pColDataDest->type)) { + size = sizeof(int32_t) * pColDataSrc->nVal; -static int32_t tColDataUpdateOffset(SColData *pColData) { - int32_t code = 0; - SValue value; - - ASSERT(pColData->nVal > 0); - ASSERT(pColData->flag); - ASSERT(IS_VAR_DATA_TYPE(pColData->type)); - - if ((pColData->flag & HAS_VALUE)) { - code = tsdbRealloc((uint8_t **)&pColData->aOffset, sizeof(int32_t) * pColData->nVal); + code = tsdbRealloc((uint8_t **)&pColDataDest->aOffset, size); if (code) goto _exit; - int32_t offset = 0; - for (int32_t iVal = 0; iVal < pColData->nVal; iVal++) { - if (pColData->flag != HAS_VALUE) { - uint8_t v = GET_BIT2(pColData->pBitMap, iVal); - if (v == 0 || v == 1) { - pColData->aOffset[iVal] = -1; - continue; - } - } - - pColData->aOffset[iVal] = offset; - offset += tGetValue(pColData->pData + offset, &value, pColData->type); - } - - ASSERT(offset == pColData->nData); - pColData->offsetValid = 1; + memcpy(pColDataDest->aOffset, pColDataSrc->aOffset, size); } + code = tsdbRealloc(&pColDataDest->pData, pColDataSrc->nData); + if (code) goto _exit; + pColDataDest->nData = pColDataSrc->nData; + memcpy(pColDataDest->pData, pColDataSrc->pData, pColDataDest->nData); + _exit: return code; } @@ -1085,11 +921,13 @@ int32_t tColDataGetValue(SColData *pColData, int32_t iVal, SColVal *pColVal) { // get value SValue value; if (IS_VAR_DATA_TYPE(pColData->type)) { - if (!pColData->offsetValid) { - code = tColDataUpdateOffset(pColData); - if (code) goto _exit; + if (iVal + 1 < pColData->nVal) { + value.nData = pColData->aOffset[iVal + 1] - pColData->aOffset[iVal]; + } else { + value.nData = pColData->nData - pColData->aOffset[iVal]; } - tGetValue(pColData->pData + pColData->aOffset[iVal], &value, pColData->type); + + value.pData = pColData->pData + pColData->aOffset[iVal]; } else { tGetValue(pColData->pData + tDataTypes[pColData->type].bytes * iVal, &value, pColData->type); } @@ -1210,7 +1048,7 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS if (code) goto _err; // append a NONE - tColDataReset(pColData, pColVal->cid, pColVal->type); + tColDataReset(pColData, pColVal->cid, pColVal->type, 0); for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) { code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColVal->cid, pColVal->type)); if (code) goto _err; @@ -1240,7 +1078,7 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS code = tBlockDataAddColData(pBlockData, iColData, &pColData); if (code) goto _err; - tColDataReset(pColData, pColVal->cid, pColVal->type); + tColDataReset(pColData, pColVal->cid, pColVal->type, 0); for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) { code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColVal->cid, pColVal->type)); if (code) goto _err;