From 85556a3c904ef864b4ecbb9dd084c79415612547 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sun, 7 Aug 2022 06:13:51 +0000 Subject: [PATCH] more work --- source/dnode/vnode/src/inc/tsdb.h | 38 +- .../dnode/vnode/src/tsdb/tsdbReaderWriter.c | 92 ++--- source/dnode/vnode/src/tsdb/tsdbUtil.c | 378 ++++++++---------- 3 files changed, 216 insertions(+), 292 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 2f440842c1..fe3992102b 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -47,7 +47,6 @@ typedef struct SBlockL SBlockL; typedef struct SColData SColData; typedef struct SDiskDataHdr SDiskDataHdr; typedef struct SBlockData SBlockData; -typedef struct SDiskData SDiskData; typedef struct SDelFile SDelFile; typedef struct SHeadFile SHeadFile; typedef struct SDataFile SDataFile; @@ -152,11 +151,6 @@ SColData *tBlockDataGetColDataByIdx(SBlockData *pBlockData, int32_t idx); void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColData); int32_t tPutBlockData(uint8_t *p, SBlockData *pBlockData); int32_t tGetBlockData(uint8_t *p, SBlockData *pBlockData); -// SDiskData -int32_t tDiskDataInit(SDiskData *pDiskData); -void tDiskDataClear(SDiskData *pDiskData); -int32_t tBlockToDiskData(SBlockData *pBlockData, SDiskData *pDiskData, int8_t cmprAlg); -int32_t tDiskToBlockData(SDiskData *pDiskData, SBlockData *pBlockData); // SDiskDataHdr int32_t tPutDiskDataHdr(uint8_t *p, void *ph); int32_t tGetDiskDataHdr(uint8_t *p, void *ph); @@ -185,6 +179,9 @@ int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SAr void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg); int32_t tPutColumnDataAgg(uint8_t *p, SColumnDataAgg *pColAgg); int32_t tGetColumnDataAgg(uint8_t *p, SColumnDataAgg *pColAgg); +int32_t tsdbCmprData(uint8_t *pIn, int32_t szIn, int8_t type, int8_t cmprAlg, uint8_t **ppOut, int32_t nOut, + int32_t *szOut, uint8_t **ppBuf); +int32_t tsdbCmprColData(SColData *pColData, int8_t cmprAlg, SBlockCol *pBlockCol, uint8_t **ppBuf); // tsdbMemTable ============================================================================================== // SMemTable int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable); @@ -404,16 +401,15 @@ struct SMapData { }; typedef struct { - int16_t cid; - int8_t type; - int8_t smaOn; - int8_t flag; // HAS_NONE|HAS_NULL|HAS_VALUE - int32_t szOrigin; // original column value size (only save for variant data type) - int32_t szBitmap; // bitmap size, 0 only for flag == HAS_VAL - int32_t szOffset; // offset size, 0 only for non-variant-length type - int32_t szValue; // value size, 0 when flag == (HAS_NULL | HAS_NONE) - int32_t offset; - uint8_t **ppData; + int16_t cid; + int8_t type; + int8_t smaOn; + int8_t flag; // HAS_NONE|HAS_NULL|HAS_VALUE + int32_t szOrigin; // original column value size (only save for variant data type) + int32_t szBitmap; // bitmap size, 0 only for flag == HAS_VAL + int32_t szOffset; // offset size, 0 only for non-variant-length type + int32_t szValue; // value size, 0 when flag == (HAS_NULL | HAS_NONE) + int32_t offset; } SBlockCol; struct SBlockInfo { @@ -588,14 +584,6 @@ struct SDelFWriter { uint8_t *pBuf1; }; -struct SDiskData { - SDiskDataHdr hdr; - uint8_t **ppKey; - SArray *aBlockCol; // SArray - int32_t nBuf; - SArray *aBuf; // SArray -}; - struct SDataFWriter { STsdb *pTsdb; SDFileSet wSet; @@ -612,8 +600,6 @@ struct SDataFWriter { uint8_t *pBuf1; uint8_t *pBuf2; - - SDiskData dData; }; struct STsdbReadSnap { diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 8933aec100..31029e6bb4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -1911,72 +1911,58 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock ASSERT(pBlockData->nRow > 0); // ================= DATA ==================== -#if 0 - // convert - code = tBlockToDiskData(pBlockData, pDiskData, cmprAlg); - if (code) goto _err; - - // write the block - if (toLast) { - pBlkInfo->offset = pWriter->fLast.size; - } else { - pBlkInfo->offset = pWriter->fData.size; - } + SDiskDataHdr hdr = {.delimiter = TSDB_FILE_DLMT, + .suid = pBlockData->suid, + .uid = pBlockData->uid, + .nRow = pBlockData->nRow, + .cmprAlg = cmprAlg}; - // HDR and KEY - int32_t size = tPutDiskDataHdr(NULL, &pDiskData->hdr); - code = tRealloc(ppBuf, size); - if (code) goto _err; - - tPutDiskDataHdr(*ppBuf, &pDiskData->hdr); - - TSCKSUM cksm = taosCalcChecksum(0, *ppBuf, size); - - int64_t n = taosWriteFile(pFD, *ppBuf, size); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); + SArray *aBlockCol = taosArrayInit(taosArrayGetSize(pBlockData->aIdx), sizeof(SBlockCol)); + if (aBlockCol == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } - cksm = taosCalcChecksum(cksm, *pDiskData->ppKey, pDiskData->hdr.szUid + pDiskData->hdr.szVer + pDiskData->hdr.szKey); - n = taosWriteFile(pFD, *pDiskData->ppKey, pDiskData->hdr.szUid + pDiskData->hdr.szVer + pDiskData->hdr.szKey); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; + // uid + if (pBlockData->uid == 0) { + ASSERT(toLast); + code = tsdbCmprData(); + if (code) goto _err; } - n = taosWriteFile(pFD, &cksm, sizeof(cksm)); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + // version + code = tsdbCmprData(); + if (code) goto _err; - pBlkInfo->szKey = size + pDiskData->hdr.szUid + pDiskData->hdr.szVer + pDiskData->hdr.szKey + sizeof(TSCKSUM); + // ts + code = tsdbCmprData(); + if (code) goto _err; - // SBlockCol - if (pDiskData->hdr.szBlkCol == 0) { - pBlkInfo->szBlock = pBlkInfo->szKey; - goto _write_sma; - } + // columns + int32_t offset = 0; + for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) { + SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData); - code = tRealloc(ppBuf, pDiskData->hdr.szBlkCol + sizeof(TSCKSUM)); - if (code) goto _err; + ASSERT(pColData->flag); - n = 0; - for (int32_t iBlockCol = 0; iBlockCol < taosArrayGetSize(pDiskData->aBlockCol); iBlockCol++) { - n += tPutBlockCol(*ppBuf + n, taosArrayGet(pDiskData->aBlockCol, iBlockCol)); - } - taosCalcChecksumAppend(0, *ppBuf, pDiskData->hdr.szBlkCol + sizeof(TSCKSUM)); + if (pColData->flag == HAS_NONE) continue; - n = taosWriteFile(pFD, *ppBuf, pDiskData->hdr.szBlkCol + sizeof(TSCKSUM)); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + SBlockCol blockCol = {.cid = pColData->cid, + .type = pColData->type, + .smaOn = pColData->smaOn, + .flag = pColData->flag, + .szOrigin = pColData->nData}; - for (int32_t iBlockCol = 0; iBlockCol < taosArrayGetSize(pDiskData->aBlockCol); iBlockCol++) { + if (pColData->flag != HAS_NULL) { + } + + if (taosArrayPush(aBlockCol, &blockCol) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } } -#endif + + // write // ================= SMA ==================== if (pSmaInfo) { diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 0ec922f20e..1b30a17dd2 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -1509,219 +1509,6 @@ int32_t tGetBlockData(uint8_t *p, SBlockData *pBlockData) { return n; } -// SDiskData ============================== -static int32_t tsdbCmprData(uint8_t *pIn, int32_t szIn, int8_t type, int8_t cmprAlg, uint8_t **ppOut, int32_t nOut, - int32_t *szOut, uint8_t **ppBuf) { - int32_t code = 0; - - ASSERT(szIn > 0 && ppOut); - - if (cmprAlg == NO_COMPRESSION) { - code = tRealloc(ppOut, nOut + szIn); - if (code) goto _exit; - - memcpy(*ppOut + nOut, pIn, szIn); - *szOut = szIn; - } else { - int32_t size = szIn + COMP_OVERFLOW_BYTES; - - code = tRealloc(ppOut, nOut + size); - if (code) goto _exit; - - if (cmprAlg == TWO_STAGE_COMP) { - ASSERT(ppBuf); - code = tRealloc(ppBuf, size); - if (code) goto _exit; - } - - *szOut = - tDataTypes[type].compFunc(pIn, szIn, szIn / tDataTypes[type].bytes, *ppOut + nOut, size, cmprAlg, *ppBuf, size); - if (*szOut <= 0) { - code = TSDB_CODE_COMPRESS_ERROR; - goto _exit; - } - } - -_exit: - return code; -} - -static int32_t tsdbCmprColData(SColData *pColData, int8_t cmprAlg, SBlockCol *pBlockCol, uint8_t **ppBuf) { - int32_t code = 0; - - int32_t n = 0; - // bitmap - if (pColData->flag != HAS_VALUE) { - code = tsdbCmprData(pColData->pBitMap, BIT2_SIZE(pColData->nVal), TSDB_DATA_TYPE_TINYINT, cmprAlg, - pBlockCol->ppData, n, &pBlockCol->szBitmap, ppBuf); - if (code) goto _exit; - } else { - pBlockCol->szBitmap = 0; - } - n += pBlockCol->szBitmap; - - // offset - if (IS_VAR_DATA_TYPE(pColData->type)) { - code = tsdbCmprData((uint8_t *)pColData->aOffset, sizeof(int32_t) * pColData->nVal, TSDB_DATA_TYPE_INT, cmprAlg, - pBlockCol->ppData, n, &pBlockCol->szOffset, ppBuf); - if (code) goto _exit; - } else { - pBlockCol->szOffset = 0; - } - n += pBlockCol->szOffset; - - // value - if (pColData->flag != (HAS_NULL | HAS_NONE)) { - code = tsdbCmprData((uint8_t *)pColData->pData, pColData->nData, pColData->type, cmprAlg, pBlockCol->ppData, n, - &pBlockCol->szValue, ppBuf); - if (code) goto _exit; - } else { - pBlockCol->szValue = 0; - } - n += pBlockCol->szValue; - - // checksum - n += sizeof(TSCKSUM); - taosCalcChecksumAppend(0, *ppBuf, n); - -_exit: - return code; -} - -static int32_t tsdbDecmprData() { - int32_t code = 0; - // TODO - return code; -} - -int32_t tDiskDataInit(SDiskData *pDiskData) { - int32_t code = 0; - - pDiskData->aBlockCol = taosArrayInit(0, sizeof(SBlockCol)); - if (pDiskData->aBlockCol == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - - pDiskData->aBuf = taosArrayInit(0, sizeof(uint8_t *)); - if (pDiskData->aBuf == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - -_exit: - return code; -} - -void tDiskDataClear(SDiskData *pDiskData) { - taosArrayDestroy(pDiskData->aBlockCol); - for (int32_t i = 0; i < taosArrayGetSize(pDiskData->aBuf); i++) { - tFree((uint8_t *)taosArrayGet(pDiskData->aBuf, i)); - } - taosArrayDestroy(pDiskData->aBuf); -} - -static uint8_t **tDiskDataAllocBuf(SDiskData *pDiskData) { - if (pDiskData->nBuf >= taosArrayGetSize(pDiskData->aBuf)) { - uint8_t *p = NULL; - if (taosArrayPush(pDiskData->aBuf, &p) == NULL) { - return NULL; - } - } - - ASSERT(pDiskData->nBuf < taosArrayGetSize(pDiskData->aBuf)); - uint8_t **pp = taosArrayGet(pDiskData->aBuf, pDiskData->nBuf); - pDiskData->nBuf++; - return pp; -} - -int32_t tBlockToDiskData(SBlockData *pBlockData, SDiskData *pDiskData, int8_t cmprAlg) { - int32_t code = 0; - - ASSERT(pBlockData->nRow > 0); - - pDiskData->cmprAlg = cmprAlg; - pDiskData->nRow = pBlockData->nRow; - pDiskData->suid = pBlockData->suid; - pDiskData->uid = pBlockData->uid; - pDiskData->szUid = 0; - pDiskData->szVer = 0; - pDiskData->szKey = 0; - taosArrayClear(pDiskData->aBlockCol); - pDiskData->nBuf = 0; - - { - pDiskData->ppKey = tDiskDataAllocBuf(pDiskData); - if (pDiskData->ppKey == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - - int32_t n = 0; - // uid - if (pDiskData->uid == 0) { - code = tsdbCmprData((uint8_t *)pBlockData->aUid, sizeof(int64_t) * pBlockData->nRow, TSDB_DATA_TYPE_BIGINT, - cmprAlg, pDiskData->ppKey, n, &pDiskData->szUid, NULL); - if (code) goto _exit; - } else { - pDiskData->szUid = 0; - } - n += pDiskData->szUid; - - // version - code = tsdbCmprData((uint8_t *)pBlockData->aVersion, sizeof(int64_t) * pBlockData->nRow, TSDB_DATA_TYPE_BIGINT, - cmprAlg, pDiskData->ppKey, n, &pDiskData->szVer, NULL); - if (code) goto _exit; - n += pDiskData->szVer; - - // tskey - code = tsdbCmprData((uint8_t *)pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->nRow, TSDB_DATA_TYPE_TIMESTAMP, - cmprAlg, pDiskData->ppKey, &pDiskData->szKey, NULL); - if (code) goto _exit; - } - - // columns - int32_t offset = 0; - for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) { - SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData); - - if (pColData->flag == HAS_NONE) continue; - - SBlockCol blockCol = {.cid = pColData->cid, - .type = pColData->type, - .smaOn = pColData->smaOn, - .flag = pColData->flag, - .szOrigin = pColData->nData}; - - if (pColData->flag != HAS_NULL) { - // alloc a buffer - blockCol.ppData = tDiskDataAllocBuf(pDiskData); - if (blockCol.ppData == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - - // compress - code = tsdbCmprColData(pColData, cmprAlg, &blockCol, NULL); - if (code) goto _exit; - - // update offset - blockCol.offset = offset; - offset = offset + blockCol.szBitmap + blockCol.szOffset + blockCol.szValue; - } - - if (taosArrayPush(pDiskData->aBlockCol, &blockCol) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - } - -_exit: - return code; -} -int32_t tBlockToDiskData(SBlockData *pBlockData, SDiskData *pDiskData, int8_t cmprAlg); -int32_t tDiskToBlockData(SDiskData *pDiskData, SBlockData *pBlockData); - // SDiskDataHdr ============================== int32_t tPutDiskDataHdr(uint8_t *p, void *ph) { int32_t n = 0; @@ -1927,3 +1714,168 @@ void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) { } } } + +int32_t tsdbCmprData(uint8_t *pIn, int32_t szIn, int8_t type, int8_t cmprAlg, uint8_t **ppOut, int32_t nOut, + int32_t *szOut, uint8_t **ppBuf) { + int32_t code = 0; + + ASSERT(szIn > 0 && ppOut); + + if (cmprAlg == NO_COMPRESSION) { + code = tRealloc(ppOut, nOut + szIn); + if (code) goto _exit; + + memcpy(*ppOut + nOut, pIn, szIn); + *szOut = szIn; + } else { + int32_t size = szIn + COMP_OVERFLOW_BYTES; + + code = tRealloc(ppOut, nOut + size); + if (code) goto _exit; + + if (cmprAlg == TWO_STAGE_COMP) { + ASSERT(ppBuf); + code = tRealloc(ppBuf, size); + if (code) goto _exit; + } + + *szOut = + tDataTypes[type].compFunc(pIn, szIn, szIn / tDataTypes[type].bytes, *ppOut + nOut, size, cmprAlg, *ppBuf, size); + if (*szOut <= 0) { + code = TSDB_CODE_COMPRESS_ERROR; + goto _exit; + } + } + +_exit: + return code; +} + +int32_t tsdbCmprColData(SColData *pColData, int8_t cmprAlg, SBlockCol *pBlockCol, uint8_t **ppBuf) { + int32_t code = 0; + + int32_t n = 0; + // bitmap + if (pColData->flag != HAS_VALUE) { + code = tsdbCmprData(pColData->pBitMap, BIT2_SIZE(pColData->nVal), TSDB_DATA_TYPE_TINYINT, cmprAlg, + pBlockCol->ppData, n, &pBlockCol->szBitmap, ppBuf); + if (code) goto _exit; + } else { + pBlockCol->szBitmap = 0; + } + n += pBlockCol->szBitmap; + + // offset + if (IS_VAR_DATA_TYPE(pColData->type)) { + code = tsdbCmprData((uint8_t *)pColData->aOffset, sizeof(int32_t) * pColData->nVal, TSDB_DATA_TYPE_INT, cmprAlg, + pBlockCol->ppData, n, &pBlockCol->szOffset, ppBuf); + if (code) goto _exit; + } else { + pBlockCol->szOffset = 0; + } + n += pBlockCol->szOffset; + + // value + if (pColData->flag != (HAS_NULL | HAS_NONE)) { + code = tsdbCmprData((uint8_t *)pColData->pData, pColData->nData, pColData->type, cmprAlg, pBlockCol->ppData, n, + &pBlockCol->szValue, ppBuf); + if (code) goto _exit; + } else { + pBlockCol->szValue = 0; + } + n += pBlockCol->szValue; + + // checksum + n += sizeof(TSCKSUM); + taosCalcChecksumAppend(0, *ppBuf, n); + +_exit: + return code; +} + +#if 0 +int32_t tBlockToDiskData(SBlockData *pBlockData, SDiskData *pDiskData, int8_t cmprAlg) { + int32_t code = 0; + + ASSERT(pBlockData->nRow > 0); + + pDiskData->cmprAlg = cmprAlg; + pDiskData->nRow = pBlockData->nRow; + pDiskData->suid = pBlockData->suid; + pDiskData->uid = pBlockData->uid; + pDiskData->szUid = 0; + pDiskData->szVer = 0; + pDiskData->szKey = 0; + taosArrayClear(pDiskData->aBlockCol); + pDiskData->nBuf = 0; + + { + pDiskData->ppKey = tDiskDataAllocBuf(pDiskData); + if (pDiskData->ppKey == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + int32_t n = 0; + // uid + if (pDiskData->uid == 0) { + code = tsdbCmprData((uint8_t *)pBlockData->aUid, sizeof(int64_t) * pBlockData->nRow, TSDB_DATA_TYPE_BIGINT, + cmprAlg, pDiskData->ppKey, n, &pDiskData->szUid, NULL); + if (code) goto _exit; + } else { + pDiskData->szUid = 0; + } + n += pDiskData->szUid; + + // version + code = tsdbCmprData((uint8_t *)pBlockData->aVersion, sizeof(int64_t) * pBlockData->nRow, TSDB_DATA_TYPE_BIGINT, + cmprAlg, pDiskData->ppKey, n, &pDiskData->szVer, NULL); + if (code) goto _exit; + n += pDiskData->szVer; + + // tskey + code = tsdbCmprData((uint8_t *)pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->nRow, TSDB_DATA_TYPE_TIMESTAMP, + cmprAlg, pDiskData->ppKey, &pDiskData->szKey, NULL); + if (code) goto _exit; + } + + // columns + int32_t offset = 0; + for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) { + SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData); + + if (pColData->flag == HAS_NONE) continue; + + SBlockCol blockCol = {.cid = pColData->cid, + .type = pColData->type, + .smaOn = pColData->smaOn, + .flag = pColData->flag, + .szOrigin = pColData->nData}; + + if (pColData->flag != HAS_NULL) { + // alloc a buffer + blockCol.ppData = tDiskDataAllocBuf(pDiskData); + if (blockCol.ppData == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + // compress + code = tsdbCmprColData(pColData, cmprAlg, &blockCol, NULL); + if (code) goto _exit; + + // update offset + blockCol.offset = offset; + offset = offset + blockCol.szBitmap + blockCol.szOffset + blockCol.szValue; + } + + if (taosArrayPush(pDiskData->aBlockCol, &blockCol) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + } + +_exit: + return code; +} +#endif -- GitLab