From 2d503135864431cd143859cb60e7d6dec26895be Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sun, 7 Aug 2022 08:12:59 +0000 Subject: [PATCH] more work --- source/dnode/vnode/src/inc/tsdb.h | 9 +- .../dnode/vnode/src/tsdb/tsdbReaderWriter.c | 89 ++++++++++++++----- source/dnode/vnode/src/tsdb/tsdbUtil.c | 43 ++++----- 3 files changed, 95 insertions(+), 46 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index fe3992102b..f746da3857 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -63,6 +63,7 @@ typedef struct SRowMerger SRowMerger; typedef struct STsdbReadSnap STsdbReadSnap; typedef struct SBlockInfo SBlockInfo; typedef struct SSmaInfo SSmaInfo; +typedef struct SBlockCol SBlockCol; #define TSDB_MAX_SUBBLOCKS 8 #define TSDB_FHDR_SIZE 512 @@ -181,7 +182,8 @@ int32_t tPutColumnDataAgg(uint8_t *p, SColumnDataAgg *pColAgg); int32_t tGetColumnDataAgg(uint8_t *p, SColumnDataAgg *pColAgg); int32_t tsdbCmprData(uint8_t *pIn, int32_t szIn, int8_t type, int8_t cmprAlg, uint8_t **ppOut, int32_t nOut, int32_t *szOut, uint8_t **ppBuf); -int32_t tsdbCmprColData(SColData *pColData, int8_t cmprAlg, SBlockCol *pBlockCol, uint8_t **ppBuf); +int32_t tsdbCmprColData(SColData *pColData, int8_t cmprAlg, SBlockCol *pBlockCol, uint8_t **ppOut, int8_t nOut, + uint8_t **ppBuf); // tsdbMemTable ============================================================================================== // SMemTable int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable); @@ -400,7 +402,7 @@ struct SMapData { uint8_t *pData; }; -typedef struct { +struct SBlockCol { int16_t cid; int8_t type; int8_t smaOn; @@ -410,7 +412,7 @@ typedef struct { int32_t szOffset; // offset size, 0 only for non-variant-length type int32_t szValue; // value size, 0 when flag == (HAS_NULL | HAS_NONE) int32_t offset; -} SBlockCol; +}; struct SBlockInfo { int64_t offset; // block data offset @@ -600,6 +602,7 @@ struct SDataFWriter { uint8_t *pBuf1; uint8_t *pBuf2; + uint8_t *pBuf3; }; struct STsdbReadSnap { diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 31029e6bb4..416b35aa03 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -1414,7 +1414,6 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } - code = tDiskDataInit(&pWriter->dData); if (code) goto _err; pWriter->pTsdb = pTsdb; pWriter->wSet = (SDFileSet){.diskId = pSet->diskId, @@ -1594,9 +1593,9 @@ int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync) { goto _err; } - tDiskDataClear(&(*ppWriter)->dData); tFree((*ppWriter)->pBuf1); tFree((*ppWriter)->pBuf2); + tFree((*ppWriter)->pBuf3); taosMemoryFree(*ppWriter); _exit: *ppWriter = NULL; @@ -1905,11 +1904,14 @@ _err: int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo, int8_t cmprAlg, int8_t toLast) { - int32_t code = 0; - TdFilePtr pFD = toLast ? pWriter->pLastFD : pWriter->pDataFD; + int32_t code = 0; ASSERT(pBlockData->nRow > 0); + pBlkInfo->offset = toLast ? pWriter->fLast.size : pWriter->fData.size; + pBlkInfo->szBlock = 0; + pBlkInfo->szKey = 0; + // ================= DATA ==================== SDiskDataHdr hdr = {.delimiter = TSDB_FILE_DLMT, .suid = pBlockData->suid, @@ -1923,23 +1925,8 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock goto _err; } - // uid - if (pBlockData->uid == 0) { - ASSERT(toLast); - code = tsdbCmprData(); - if (code) goto _err; - } - - // version - code = tsdbCmprData(); - if (code) goto _err; - - // ts - code = tsdbCmprData(); - if (code) goto _err; - - // columns - int32_t offset = 0; + // encode ================= + int32_t nBuf1 = 0; for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) { SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData); @@ -1954,15 +1941,71 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock .szOrigin = pColData->nData}; if (pColData->flag != HAS_NULL) { + code = tsdbCmprColData(pColData, cmprAlg, &blockCol, &pWriter->pBuf1, nBuf1, &pWriter->pBuf3); + if (code) goto _err; + + blockCol.offset = nBuf1; + nBuf1 = nBuf1 + blockCol.szBitmap + blockCol.szOffset + blockCol.szValue + sizeof(TSCKSUM); } if (taosArrayPush(aBlockCol, &blockCol) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } + + hdr.szBlkCol += tPutBlockCol(NULL, &blockCol); } - // write + // (uid + version + tskey + aBlockCol) + if (pBlockData->uid == 0) { + code = tsdbCmprData((uint8_t *)pBlockData->aUid, sizeof(int64_t) * pBlockData->nRow, TSDB_DATA_TYPE_BIGINT, cmprAlg, + &pWriter->pBuf2, 0, &hdr.szUid, &pWriter->pBuf3); + if (code) goto _err; + } + + code = tsdbCmprData((uint8_t *)pBlockData->aVersion, sizeof(int64_t) * pBlockData->nRow, TSDB_DATA_TYPE_BIGINT, + cmprAlg, &pWriter->pBuf2, hdr.szUid, &hdr.szVer, &pWriter->pBuf3); + if (code) goto _err; + + code = tsdbCmprData((uint8_t *)pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->nRow, TSDB_DATA_TYPE_TIMESTAMP, + cmprAlg, &pWriter->pBuf2, hdr.szUid + hdr.szVer, &hdr.szKey, &pWriter->pBuf3); + if (code) goto _err; + + pBlkInfo->szKey = tPutDiskDataHdr(NULL, &hdr); + code = tRealloc(&pWriter->pBuf3, pBlkInfo->szKey); + if (code) goto _err; + tPutDiskDataHdr(pWriter->pBuf3, &hdr); + TSCKSUM cksm = taosCalcChecksum(0, pWriter->pBuf3, pBlkInfo->szKey); + + // write ================= + TdFilePtr pFD = toLast ? pWriter->pLastFD : pWriter->pDataFD; + + // hdr + int64_t n = taosWriteFile(pFD, pWriter->pBuf3, pBlkInfo->szKey); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + // uid + version + tskey + (CKSM) + taosCalcChecksumAppend(cksm, pWriter->pBuf2, hdr.szUid + hdr.szVer + hdr.szKey + sizeof(TSCKSUM)); + n = taosWriteFile(pFD, pWriter->pBuf2, hdr.szUid + hdr.szVer + hdr.szKey + sizeof(TSCKSUM)); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + pBlkInfo->szKey = pBlkInfo->szKey + hdr.szUid + hdr.szVer + hdr.szKey + sizeof(TSCKSUM); + + // aBlockCol + + // colmns + + // update info + if (toLast) { + pWriter->fLast.size += pBlkInfo->szBlock; + } else { + pWriter->fData.size += pBlkInfo->szBlock; + } // ================= SMA ==================== if (pSmaInfo) { @@ -1971,10 +2014,12 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock } _exit: + taosArrayDestroy(aBlockCol); return code; _err: tsdbError("vgId:%d tsdb write block data failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); + taosArrayDestroy(aBlockCol); return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 1b30a17dd2..8f3bb6e89c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -1373,11 +1373,11 @@ int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlock c = tsdbKeyCmprFn(&TSDBROW_KEY(&row1), &TSDBROW_KEY(&row2)); if (c < 0) { - code = tBlockDataAppendRow(pBlockData, &row1, NULL); + // code = tBlockDataAppendRow(pBlockData, &row1, NULL); if (code) goto _exit; iRow1++; } else if (c > 0) { - code = tBlockDataAppendRow(pBlockData, &row2, NULL); + // code = tBlockDataAppendRow(pBlockData, &row2, NULL); if (code) goto _exit; iRow2++; } else { @@ -1387,14 +1387,14 @@ int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlock while (iRow1 < nRow1) { row1 = tsdbRowFromBlockData(pBlockData1, iRow1); - code = tBlockDataAppendRow(pBlockData, &row1, NULL); + // code = tBlockDataAppendRow(pBlockData, &row1, NULL); if (code) goto _exit; iRow1++; } while (iRow2 < nRow2) { row2 = tsdbRowFromBlockData(pBlockData2, iRow2); - code = tBlockDataAppendRow(pBlockData, &row2, NULL); + // code = tBlockDataAppendRow(pBlockData, &row2, NULL); if (code) goto _exit; iRow2++; } @@ -1751,43 +1751,44 @@ _exit: return code; } -int32_t tsdbCmprColData(SColData *pColData, int8_t cmprAlg, SBlockCol *pBlockCol, uint8_t **ppBuf) { +int32_t tsdbCmprColData(SColData *pColData, int8_t cmprAlg, SBlockCol *pBlockCol, uint8_t **ppOut, int8_t nOut, + uint8_t **ppBuf) { int32_t code = 0; - int32_t n = 0; + pBlockCol->szBitmap = 0; + pBlockCol->szOffset = 0; + pBlockCol->szValue = 0; + + int32_t size = 0; // bitmap if (pColData->flag != HAS_VALUE) { - code = tsdbCmprData(pColData->pBitMap, BIT2_SIZE(pColData->nVal), TSDB_DATA_TYPE_TINYINT, cmprAlg, - pBlockCol->ppData, n, &pBlockCol->szBitmap, ppBuf); + code = tsdbCmprData(pColData->pBitMap, BIT2_SIZE(pColData->nVal), TSDB_DATA_TYPE_TINYINT, cmprAlg, ppOut, + nOut + size, &pBlockCol->szBitmap, ppBuf); if (code) goto _exit; - } else { - pBlockCol->szBitmap = 0; } - n += pBlockCol->szBitmap; + size += pBlockCol->szBitmap; // offset if (IS_VAR_DATA_TYPE(pColData->type)) { code = tsdbCmprData((uint8_t *)pColData->aOffset, sizeof(int32_t) * pColData->nVal, TSDB_DATA_TYPE_INT, cmprAlg, - pBlockCol->ppData, n, &pBlockCol->szOffset, ppBuf); + ppOut, nOut + size, &pBlockCol->szOffset, ppBuf); if (code) goto _exit; - } else { - pBlockCol->szOffset = 0; } - n += pBlockCol->szOffset; + size += pBlockCol->szOffset; // value if (pColData->flag != (HAS_NULL | HAS_NONE)) { - code = tsdbCmprData((uint8_t *)pColData->pData, pColData->nData, pColData->type, cmprAlg, pBlockCol->ppData, n, + code = tsdbCmprData((uint8_t *)pColData->pData, pColData->nData, pColData->type, cmprAlg, ppOut, nOut + size, &pBlockCol->szValue, ppBuf); if (code) goto _exit; - } else { - pBlockCol->szValue = 0; } - n += pBlockCol->szValue; + size += pBlockCol->szValue; // checksum - n += sizeof(TSCKSUM); - taosCalcChecksumAppend(0, *ppBuf, n); + size += sizeof(TSCKSUM); + code = tRealloc(ppOut, nOut + size); + if (code) goto _exit; + taosCalcChecksumAppend(0, *ppOut + nOut, size); _exit: return code; -- GitLab