From 3278c114910c6c476a4e5cea70b7c294adb82e5f Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 22 Dec 2022 15:02:39 +0800 Subject: [PATCH] more code --- source/dnode/vnode/src/inc/tsdb.h | 2 + source/dnode/vnode/src/tsdb/tsdbCompact.c | 101 ++++++++++++++++++++- source/dnode/vnode/src/tsdb/tsdbSnapshot.c | 1 - source/dnode/vnode/src/vnd/vnodeCompact.c | 13 +++ 4 files changed, 112 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 7fb962e3a7..9ac6596c9d 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -801,6 +801,8 @@ static FORCE_INLINE TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter) { return pIter->pRow; } +int32_t tRowInfoCmprFn(const void *p1, const void *p2); + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/tsdb/tsdbCompact.c b/source/dnode/vnode/src/tsdb/tsdbCompact.c index 244fa1c017..a37af2813e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCompact.c +++ b/source/dnode/vnode/src/tsdb/tsdbCompact.c @@ -38,9 +38,10 @@ typedef struct { } SSttDIter; typedef struct { - int32_t flag; - SRowInfo rowInfo; - char handle[]; + int32_t flag; + SRowInfo rowInfo; + SRBTreeNode n; + char handle[]; } STsdbDataIter; typedef struct { @@ -56,6 +57,13 @@ typedef struct { #define TSDB_FLG_DEEP_COMPACT 0x1 // ITER ========================= +static int32_t tsdbDataIterCmprFn(const SRBTreeNode *n1, const SRBTreeNode *n2) { + const STsdbDataIter *pIter1 = (STsdbDataIter *)((char *)n1 - offsetof(STsdbDataIter, n)); + const STsdbDataIter *pIter2 = (STsdbDataIter *)((char *)n2 - offsetof(STsdbDataIter, n)); + + return tRowInfoCmprFn(&pIter1->rowInfo, &pIter2->rowInfo); +} + static int32_t tsdbMemDIterOpen(STsdbDataIter **ppIter) { int32_t code = 0; int32_t lino = 0; @@ -87,18 +95,52 @@ static int32_t tsdbDataDIterOpen(SDataFReader *pReader, STsdbDataIter **ppIter) goto _exit; } + SDataDIter *pDataDIter = (SDataDIter *)pIter->handle; + pDataDIter->pReader = pReader; + pDataDIter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); + if (pDataDIter->aBlockIdx == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + code = tsdbReadBlockIdx(pReader, pDataDIter->aBlockIdx); + TSDB_CHECK_CODE(code, lino, _exit); + + if (taosArrayGetSize(pDataDIter->aBlockIdx) == 0) goto _clear_exit; + + code = tBlockDataCreate(&pDataDIter->bData); + TSDB_CHECK_CODE(code, lino, _exit); + + // read first data block + pDataDIter->iBlockIdx = 0; + code = tsdbReadDataBlk(pReader, taosArrayGet(pDataDIter->aBlockIdx, pDataDIter->iBlockIdx), &pDataDIter->mDataBlk); + TSDB_CHECK_CODE(code, lino, _exit); + + pDataDIter->iDataBlk = 0; + // code = tsdbReadDataBlock(pReader, tMapDat); + // TSDB_CHECK_CODE(code, lino, _exit); + + pDataDIter->iRow = 0; + // TODO _exit: if (code) { + _clear_exit: *ppIter = NULL; + if (pIter) { + tBlockDataDestroy(&pDataDIter->bData, 1); + tMapDataClear(&pDataDIter->mDataBlk); + taosArrayDestroy(pDataDIter->aBlockIdx); + taosMemoryFree(pIter); + } } else { *ppIter = pIter; } return code; } -static int32_t tsdbSttDIterOpen(SDataFReader *pReader, STsdbDataIter **ppIter) { +static int32_t tsdbSttDIterOpen(SDataFReader *pReader, int32_t iStt, STsdbDataIter **ppIter) { int32_t code = 0; int32_t lino = 0; @@ -108,11 +150,40 @@ static int32_t tsdbSttDIterOpen(SDataFReader *pReader, STsdbDataIter **ppIter) { goto _exit; } + SSttDIter *pSttDIter = (SSttDIter *)pIter->handle; + pSttDIter->pReader = pReader; + pSttDIter->iStt = iStt; + pSttDIter->aSttBlk = taosArrayInit(0, sizeof(SSttBlk)); + if (pSttDIter->aSttBlk == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + code = tsdbReadSttBlk(pReader, pSttDIter->iStt, pSttDIter->aSttBlk); + TSDB_CHECK_CODE(code, lino, _exit); + + if (taosArrayGetSize(pSttDIter->aSttBlk) == 0) goto _clear_exit; + + code = tBlockDataCreate(&pSttDIter->bData); + TSDB_CHECK_CODE(code, lino, _exit); + + pSttDIter->iSttBlk = 0; + // code = tsdbReadSttBlock(pReader, taosArrayGet(pSttDIter->aSttBlk, pSttDIter->iSttBlk), &pSttDIter->bData); + // TSDB_CHECK_CODE(code, lino, _exit); + + pSttDIter->iRow = 0; + // TODO _exit: if (code) { + _clear_exit: *ppIter = NULL; + if (pIter) { + tBlockDataDestroy(&pSttDIter->bData, 1); + taosArrayDestroy(pSttDIter->aSttBlk); + taosMemoryFree(pIter); + } } else { *ppIter = pIter; } @@ -227,6 +298,27 @@ int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) { compactor.fid = compactor.pDFileSet->fid; + code = tsdbDataFReaderOpen(&compactor.pReader, pTsdb, compactor.pDFileSet); + TSDB_CHECK_CODE(code, lino, _exit); + + // open those iterators + tRBTreeCreate(&compactor.rtree, tsdbDataIterCmprFn); + + STsdbDataIter *pIter; + + code = tsdbDataDIterOpen(compactor.pReader, &pIter); + TSDB_CHECK_CODE(code, lino, _exit); + + if (pIter) tRBTreePut(&compactor.rtree, &pIter->n); + + for (int32_t iStt = 0; iStt < compactor.pReader->pSet->nSttF; iStt++) { + code = tsdbSttDIterOpen(compactor.pReader, iStt, &pIter); + TSDB_CHECK_CODE(code, lino, _exit); + + if (pIter) tRBTreePut(&compactor.rtree, &pIter->n); + } + +#if 0 if (flag & TSDB_FLG_DEEP_COMPACT) { code = tsdbDeepCompact(&compactor); TSDB_CHECK_CODE(code, lino, _exit); @@ -234,6 +326,7 @@ int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) { code = tsdbShallowCompact(&compactor); TSDB_CHECK_CODE(code, lino, _exit); } +#endif } _exit: diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 266fdde2df..384b365890 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -63,7 +63,6 @@ struct STsdbSnapReader { uint8_t* aBuf[5]; }; -extern int32_t tRowInfoCmprFn(const void* p1, const void* p2); extern int32_t tsdbReadDataBlockEx(SDataFReader* pReader, SDataBlk* pDataBlk, SBlockData* pBlockData); extern int32_t tsdbUpdateTableSchema(SMeta* pMeta, int64_t suid, int64_t uid, SSkmInfo* pSkmInfo); diff --git a/source/dnode/vnode/src/vnd/vnodeCompact.c b/source/dnode/vnode/src/vnd/vnodeCompact.c index fa45cb9b51..dfadeadb12 100644 --- a/source/dnode/vnode/src/vnd/vnodeCompact.c +++ b/source/dnode/vnode/src/vnd/vnodeCompact.c @@ -15,13 +15,26 @@ #include "vnd.h" +extern int32_t tsdbCompact(STsdb *pTsdb, int32_t flag); + extern void vnodePrepareCommit(SVnode *pVnode); static int32_t vnodeCompactImpl(SCommitInfo *pInfo) { int32_t code = 0; + int32_t lino = 0; // TODO + SVnode *pVnode = pInfo->pVnode; + + code = tsdbCompact(pVnode->pTsdb, 0); + TSDB_CHECK_CODE(code, lino, _exit); +_exit: + if (code) { + vError("vgId:%d %s failed since %s", TD_VID(pInfo->pVnode), __func__, tstrerror(code)); + } else { + vDebug("vgId:%d %s done", TD_VID(pInfo->pVnode), __func__); + } return code; } -- GitLab