From 8bd8ff152390f1d7100c316342f2ca339076eb00 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 27 Sep 2022 18:59:29 +0800 Subject: [PATCH] more code --- source/dnode/vnode/src/inc/tsdb.h | 15 ++- source/dnode/vnode/src/tsdb/tsdbCommit.c | 27 ++--- source/dnode/vnode/src/tsdb/tsdbDiskData.c | 25 +++-- .../dnode/vnode/src/tsdb/tsdbReaderWriter.c | 106 ++++++++++++------ source/dnode/vnode/src/tsdb/tsdbUtil.c | 5 +- 5 files changed, 111 insertions(+), 67 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index a928cc725d..2a627af324 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -74,6 +74,7 @@ typedef struct SLDataIter SLDataIter; typedef struct SDiskCol SDiskCol; typedef struct SDiskData SDiskData; typedef struct SDiskDataBuilder SDiskDataBuilder; +typedef struct SBlkInfo SBlkInfo; #define TSDB_FILE_DLMT ((uint32_t)0xF00AFA0F) #define TSDB_MAX_SUBBLOCKS 8 @@ -173,7 +174,7 @@ int32_t tCmprBlockData(SBlockData *pBlockData, int8_t cmprAlg, uint8_t **ppOut int32_t aBufN[]); int32_t tDecmprBlockData(uint8_t *pIn, int32_t szIn, SBlockData *pBlockData, uint8_t *aBuf[]); // SDiskDataHdr -int32_t tPutDiskDataHdr(uint8_t *p, void *ph); +int32_t tPutDiskDataHdr(uint8_t *p, const SDiskDataHdr *pHdr); int32_t tGetDiskDataHdr(uint8_t *p, void *ph); // SDelIdx int32_t tPutDelIdx(uint8_t *p, void *ph); @@ -270,6 +271,7 @@ int32_t tsdbWriteDataBlk(SDataFWriter *pWriter, SMapData *mDataBlk, SBlockIdx *p int32_t tsdbWriteSttBlk(SDataFWriter *pWriter, SArray *aSttBlk); int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo, int8_t cmprAlg, int8_t toLast); +int32_t tsdbWriteDiskData(SDataFWriter *pWriter, const SDiskData *pDiskData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo); int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo); // SDataFReader @@ -328,7 +330,6 @@ int32_t tDiskDataBuilderInit(SDiskDataBuilder *pBuilder, STSchema *pTSchema, TAB uint8_t calcSma); int32_t tDiskDataBuilderClear(SDiskDataBuilder *pBuilder); int32_t tDiskDataAddRow(SDiskDataBuilder *pBuilder, TSDBROW *pRow, STSchema *pTSchema, TABLEID *pId); -int32_t tGnrtDiskData(SDiskDataBuilder *pBuilder, const SDiskData **ppDiskData, const SBlkInfo *pBlkInfo); int32_t tGnrtDiskData(SDiskDataBuilder *pBuilder, const SDiskData **ppDiskData, const SBlkInfo **ppBlkInfo); // structs ======================= @@ -451,14 +452,16 @@ struct SSmaInfo { int32_t size; }; -typedef struct { +struct SBlkInfo { int64_t minUid; int64_t maxUid; - TSDBKEY minKey; - TSDBKEY maxKey; + TSKEY minKey; + TSKEY maxKey; int64_t minVer; int64_t maxVer; -} SBlkInfo; + TSDBKEY minTKey; + TSDBKEY maxTKey; +}; struct SDataBlk { TSDBKEY minKey; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 7bc335d034..1a0534df5c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -542,6 +542,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { taosArrayClear(pCommitter->dWriter.aSttBlk); tMapDataReset(&pCommitter->dWriter.mBlock); tBlockDataReset(&pCommitter->dWriter.bData); + tDiskDataBuilderClear(pCommitter->dWriter.pBuilder); // open iter code = tsdbOpenCommitIter(pCommitter); @@ -655,21 +656,21 @@ static int32_t tsdbCommitSttBlk(SDataFWriter *pWriter, SDiskDataBuilder *pBuilde if (pBuilder->nRow == 0) return code; - SSttBlk sttBlk = {.suid = pBuilder->suid, - .minUid = 0, // todo - .maxUid = 0, // todo - .minKey = 0, // todo - .maxKey = 0, // todo - .minVer = 0, // todo - .maxVer = 0, // todo - .nRow = pBuilder->nRow}; - // gnrt - // code = tGnrtDiskData(pBuilder, &pBuilder->dd); + const SDiskData *pDiskData; + const SBlkInfo *pBlkInfo; + code = tGnrtDiskData(pBuilder, &pDiskData, &pBlkInfo); TSDB_CHECK_CODE(code, lino, _exit); + SSttBlk sttBlk = {.suid = pBuilder->suid, + .minUid = pBlkInfo->minUid, + .maxUid = pBlkInfo->maxUid, + .minKey = pBlkInfo->minKey, + .maxKey = pBlkInfo->maxKey, + .minVer = pBlkInfo->minVer, + .maxVer = pBlkInfo->maxVer}; // write - // code = tsdbWriteDiskData(pWriter, &pBuilder->dd); + code = tsdbWriteDiskData(pWriter, pDiskData, &sttBlk.bInfo, NULL); TSDB_CHECK_CODE(code, lino, _exit); // push @@ -678,8 +679,8 @@ static int32_t tsdbCommitSttBlk(SDataFWriter *pWriter, SDiskDataBuilder *pBuilde TSDB_CHECK_CODE(code, lino, _exit); } - // clear (todo) - // tDiskDataBuilderClear(pBuilder); + // clear + tDiskDataBuilderClear(pBuilder); _exit: if (code) { diff --git a/source/dnode/vnode/src/tsdb/tsdbDiskData.c b/source/dnode/vnode/src/tsdb/tsdbDiskData.c index d3ea4f1dda..43be51a694 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDiskData.c +++ b/source/dnode/vnode/src/tsdb/tsdbDiskData.c @@ -498,10 +498,12 @@ int32_t tDiskDataBuilderInit(SDiskDataBuilder *pBuilder, STSchema *pTSchema, TAB pBuilder->calcSma = calcSma; pBuilder->bi = (SBlkInfo){.minUid = INT64_MAX, .maxUid = INT64_MIN, - .minKey = TSDBKEY_MAX, - .maxKey = TSDBKEY_MIN, + .minKey = TSKEY_MAX, + .maxKey = TSKEY_MIN, .minVer = VERSION_MAX, - .maxVer = VERSION_MIN}; + .maxVer = VERSION_MIN, + .minTKey = TSDBKEY_MAX, + .maxTKey = TSDBKEY_MIN}; if (pBuilder->pUidC == NULL && (code = tCompressorCreate(&pBuilder->pUidC))) return code; code = tCompressStart(pBuilder->pUidC, TSDB_DATA_TYPE_BIGINT, cmprAlg); @@ -551,6 +553,7 @@ int32_t tDiskDataBuilderClear(SDiskDataBuilder *pBuilder) { int32_t code = 0; pBuilder->suid = 0; pBuilder->uid = 0; + pBuilder->nRow = 0; return code; } @@ -560,7 +563,9 @@ int32_t tDiskDataAddRow(SDiskDataBuilder *pBuilder, TSDBROW *pRow, STSchema *pTS ASSERT(pBuilder->suid || pBuilder->uid); ASSERT(pId->suid == pBuilder->suid); - TSDBKEY key = TSDBROW_KEY(pRow); + TSDBKEY kRow = TSDBROW_KEY(pRow); + if (tsdbKeyCmprFn(&pBuilder->bi.minTKey, &kRow) > 0) pBuilder->bi.minTKey = kRow; + if (tsdbKeyCmprFn(&pBuilder->bi.maxTKey, &kRow) < 0) pBuilder->bi.maxTKey = kRow; // uid if (pBuilder->uid && pBuilder->uid != pId->uid) { @@ -579,16 +584,16 @@ int32_t tDiskDataAddRow(SDiskDataBuilder *pBuilder, TSDBROW *pRow, STSchema *pTS if (pBuilder->bi.maxUid < pId->uid) pBuilder->bi.maxUid = pId->uid; // version - code = tCompress(pBuilder->pVerC, &key.version, sizeof(int64_t)); + code = tCompress(pBuilder->pVerC, &kRow.version, sizeof(int64_t)); if (code) return code; - if (pBuilder->bi.minVer > key.version) pBuilder->bi.minVer = key.version; - if (pBuilder->bi.maxVer < key.version) pBuilder->bi.maxVer = key.version; + if (pBuilder->bi.minVer > kRow.version) pBuilder->bi.minVer = kRow.version; + if (pBuilder->bi.maxVer < kRow.version) pBuilder->bi.maxVer = kRow.version; // TSKEY - code = tCompress(pBuilder->pKeyC, &key.ts, sizeof(int64_t)); + code = tCompress(pBuilder->pKeyC, &kRow.ts, sizeof(int64_t)); if (code) return code; - if (tsdbKeyCmprFn(&pBuilder->bi.minKey, &key) > 0) pBuilder->bi.minKey = key; - if (tsdbKeyCmprFn(&pBuilder->bi.maxKey, &key) < 0) pBuilder->bi.maxKey = key; + if (pBuilder->bi.minKey > kRow.ts) pBuilder->bi.minKey = kRow.ts; + if (pBuilder->bi.maxKey < kRow.ts) pBuilder->bi.maxKey = kRow.ts; SRowIter iter = {0}; tRowIterInit(&iter, pRow, pTSchema); diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 4352595d88..00b6830fbc 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -522,9 +522,6 @@ static int32_t tsdbWriteBlockSma(SDataFWriter *pWriter, SBlockData *pBlockData, // write if (pSmaInfo->size) { - code = tRealloc(&pWriter->aBuf[0], pSmaInfo->size); - if (code) goto _err; - code = tsdbWriteFile(pWriter->pSmaFD, pWriter->fSma.size, pWriter->aBuf[0], pSmaInfo->size); if (code) goto _err; @@ -607,12 +604,20 @@ _err: return code; } -int32_t tsdbWriteDiskData(SDataFWriter *pWriter, SDiskData *pDiskData, SBlockInfo *pBlkInfo) { +int32_t tsdbWriteDiskData(SDataFWriter *pWriter, const SDiskData *pDiskData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo) { int32_t code = 0; int32_t lino = 0; - STsdbFD *pFD = pWriter->pDataFD; // todo - int64_t offset = pWriter->fData.size; + STsdbFD *pFD = NULL; + if (pSmaInfo) { + pFD = pWriter->pDataFD; + pBlkInfo->offset = pWriter->fData.size; + } else { + pFD = pWriter->pSttFD; + pBlkInfo->offset = pWriter->fStt[pWriter->wSet.nSttF - 1].size; + } + pBlkInfo->szBlock = 0; + pBlkInfo->szKey = 0; // hdr int32_t n = tPutDiskDataHdr(NULL, &pDiskData->hdr); @@ -621,26 +626,30 @@ int32_t tsdbWriteDiskData(SDataFWriter *pWriter, SDiskData *pDiskData, SBlockInf tPutDiskDataHdr(pWriter->aBuf[0], &pDiskData->hdr); - code = tsdbWriteFile(pFD, offset, pWriter->aBuf[0], n); + code = tsdbWriteFile(pFD, pBlkInfo->offset, pWriter->aBuf[0], n); TSDB_CHECK_CODE(code, lino, _exit); - offset += n; + pBlkInfo->szKey += n; + pBlkInfo->szBlock += n; // uid + ver + key - if (pDiskData->hdr.szUid) { - code = tsdbWriteFile(pFD, offset, pDiskData->pUid, pDiskData->hdr.szUid); + if (pDiskData->pUid) { + code = tsdbWriteFile(pFD, pBlkInfo->offset + pBlkInfo->szBlock, pDiskData->pUid, pDiskData->hdr.szUid); TSDB_CHECK_CODE(code, lino, _exit); - offset += pDiskData->hdr.szUid; + pBlkInfo->szKey += pDiskData->hdr.szUid; + pBlkInfo->szBlock += pDiskData->hdr.szUid; } - code = tsdbWriteFile(pFD, offset, pDiskData->pVer, pDiskData->hdr.szVer); + code = tsdbWriteFile(pFD, pBlkInfo->offset + pBlkInfo->szBlock, pDiskData->pVer, pDiskData->hdr.szVer); TSDB_CHECK_CODE(code, lino, _exit); - offset += pDiskData->hdr.szVer; + pBlkInfo->szKey += pDiskData->hdr.szVer; + pBlkInfo->szBlock += pDiskData->hdr.szVer; - code = tsdbWriteFile(pFD, offset, pDiskData->pKey, pDiskData->hdr.szKey); + code = tsdbWriteFile(pFD, pBlkInfo->offset + pBlkInfo->szBlock, pDiskData->pKey, pDiskData->hdr.szKey); TSDB_CHECK_CODE(code, lino, _exit); - offset += pDiskData->hdr.szKey; + pBlkInfo->szKey += pDiskData->hdr.szKey; + pBlkInfo->szBlock += pDiskData->hdr.szKey; - // SBlockCol + // aBlockCol if (pDiskData->hdr.szBlkCol) { code = tRealloc(&pWriter->aBuf[0], pDiskData->hdr.szBlkCol); TSDB_CHECK_CODE(code, lino, _exit); @@ -648,48 +657,75 @@ int32_t tsdbWriteDiskData(SDataFWriter *pWriter, SDiskData *pDiskData, SBlockInf n = 0; for (int32_t iDiskCol = 0; iDiskCol < taosArrayGetSize(pDiskData->aDiskCol); iDiskCol++) { SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pDiskData->aDiskCol, iDiskCol); - - n += tPutBlockCol(pWriter->aBuf[0] + n, &pDiskCol->bCol); + n += tPutBlockCol(pWriter->aBuf[0] + n, pDiskCol); } - ASSERT(n == pDiskData->hdr.szBlkCol); - code = tsdbWriteFile(pFD, offset, pWriter->aBuf[0], pDiskData->hdr.szBlkCol); + code = tsdbWriteFile(pFD, pBlkInfo->offset + pBlkInfo->szBlock, pWriter->aBuf[0], pDiskData->hdr.szBlkCol); TSDB_CHECK_CODE(code, lino, _exit); - offset += pDiskData->hdr.szBlkCol; + pBlkInfo->szBlock += pDiskData->hdr.szBlkCol; } - // pData + // aDiskCol for (int32_t iDiskCol = 0; iDiskCol < taosArrayGetSize(pDiskData->aDiskCol); iDiskCol++) { SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pDiskData->aDiskCol, iDiskCol); - if (pDiskCol->bCol.flag == HAS_NULL) continue; - - if (pDiskCol->bCol.szBitmap) { - code = tsdbWriteFile(pFD, offset, pDiskCol->pBit, pDiskCol->bCol.szBitmap); + if (pDiskCol->pBit) { + code = tsdbWriteFile(pFD, pBlkInfo->offset + pBlkInfo->szBlock, pDiskCol->pBit, pDiskCol->bCol.szBitmap); TSDB_CHECK_CODE(code, lino, _exit); - offset += pDiskCol->bCol.szBitmap; + + pBlkInfo->szBlock += pDiskCol->bCol.szBitmap; } - if (pDiskCol->bCol.szOffset) { - code = tsdbWriteFile(pFD, offset, pDiskCol->pOff, pDiskCol->bCol.szOffset); + if (pDiskCol->pOff) { + code = tsdbWriteFile(pFD, pBlkInfo->offset + pBlkInfo->szBlock, pDiskCol->pOff, pDiskCol->bCol.szOffset); TSDB_CHECK_CODE(code, lino, _exit); - offset += pDiskCol->bCol.szOffset; + + pBlkInfo->szBlock += pDiskCol->bCol.szOffset; } - if (pDiskCol->bCol.szValue) { - code = tsdbWriteFile(pFD, offset, pDiskCol->pVal, pDiskCol->bCol.szValue); + if (pDiskCol->pVal) { + code = tsdbWriteFile(pFD, pBlkInfo->offset + pBlkInfo->szBlock, pDiskCol->pVal, pDiskCol->bCol.szValue); TSDB_CHECK_CODE(code, lino, _exit); - offset += pDiskCol->bCol.szValue; + + pBlkInfo->szBlock += pDiskCol->bCol.szValue; } } + if (pSmaInfo) { + pWriter->fData.size += pBlkInfo->szBlock; + } else { + pWriter->fStt[pWriter->wSet.nSttF - 1].size += pBlkInfo->szBlock; + goto _exit; + } + + pSmaInfo->offset = 0; + pSmaInfo->size = 0; + for (int32_t iDiskCol = 0; iDiskCol < taosArrayGetSize(pDiskData->aDiskCol); iDiskCol++) { + SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pDiskData->aDiskCol, iDiskCol); + + if (IS_VAR_DATA_TYPE(pDiskCol->bCol.type)) continue; + if (pDiskCol->bCol.flag == HAS_NULL || pDiskCol->bCol.flag == (HAS_NULL | HAS_NONE)) continue; + if (!pDiskCol->bCol.smaOn) continue; + + code = tRealloc(&pWriter->aBuf[0], pSmaInfo->size + tPutColumnDataAgg(NULL, &pDiskCol->agg)); + TSDB_CHECK_CODE(code, lino, _exit); + pSmaInfo->size += tPutColumnDataAgg(pWriter->aBuf[0] + pSmaInfo->size, &pDiskCol->agg); + } + + if (pSmaInfo->size) { + pSmaInfo->offset = pWriter->fSma.size; + + code = tsdbWriteFile(pWriter->pSmaFD, pSmaInfo->offset, pWriter->aBuf[0], pSmaInfo->size); + TSDB_CHECK_CODE(code, lino, _exit); + + pWriter->fSma.size += pSmaInfo->size; + } + _exit: if (code) { tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); - } else { - tsdbDebug("vgId:%d %s", TD_VID(pWriter->pTsdb->pVnode), __func__); } return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 8b95d15407..47c836d9c1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -1454,9 +1454,8 @@ _exit: } // SDiskDataHdr ============================== -int32_t tPutDiskDataHdr(uint8_t *p, void *ph) { - int32_t n = 0; - SDiskDataHdr *pHdr = (SDiskDataHdr *)ph; +int32_t tPutDiskDataHdr(uint8_t *p, const SDiskDataHdr *pHdr) { + int32_t n = 0; n += tPutU32(p ? p + n : p, pHdr->delimiter); n += tPutU32v(p ? p + n : p, pHdr->fmtVer); -- GitLab