diff --git a/source/dnode/vnode/src/tsdb/tsdbCompact.c b/source/dnode/vnode/src/tsdb/tsdbCompact.c index d98e8e9ce346bb0e66243f2eea54f7274432ced4..0a27abdbf97bc310781c5c3fc81824c06115d79d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCompact.c +++ b/source/dnode/vnode/src/tsdb/tsdbCompact.c @@ -53,19 +53,27 @@ typedef struct STsdbDataIter { #define TSDB_DATA_ITER_FROM_RBTN(N) ((STsdbDataIter *)((char *)N - offsetof(STsdbDataIter, n))) typedef struct { - STsdb *pTsdb; - int64_t cid; - int32_t maxRows; - int32_t minRows; - STsdbFS fs; - int32_t fid; - SDFileSet *pDFileSet; + STsdb *pTsdb; + int64_t cid; + int32_t maxRows; + int32_t minRows; + STsdbFS fs; + int32_t fid; + SDFileSet *pDFileSet; + + // Reader SDataFReader *pReader; STsdbDataIter *iterList; // list of iterators SRBTree rtree; STsdbDataIter *pIter; SBlockData bData; SSkmInfo tbSkm; + + // Writer + SDataFWriter *pWriter; + SArray *aBlockIdx; // SArray + SMapData mDataBlk; // SMapData + SArray *aSttBlk; // SArray } STsdbCompactor; #define TSDB_FLG_DEEP_COMPACT 0x1 @@ -141,7 +149,7 @@ _exit: _clear_exit: *ppIter = NULL; if (pIter) { - tBlockDataDestroy(&pDataDIter->bData, 1); + tBlockDataDestroy(&pDataDIter->bData); tMapDataClear(&pDataDIter->mDataBlk); taosArrayDestroy(pDataDIter->aBlockIdx); taosMemoryFree(pIter); @@ -191,7 +199,7 @@ _exit: _clear_exit: *ppIter = NULL; if (pIter) { - tBlockDataDestroy(&pSttDIter->bData, 1); + tBlockDataDestroy(&pSttDIter->bData); taosArrayDestroy(pSttDIter->aSttBlk); taosMemoryFree(pIter); } @@ -559,6 +567,33 @@ static int32_t tsdbOpenCompactor(STsdbCompactor *pCompactor) { pCompactor->pIter = NULL; tBlockDataReset(&pCompactor->bData); + // open writers + SDFileSet fSet = {0}; // TODO + code = tsdbDataFWriterOpen(&pCompactor->pWriter, pTsdb, NULL); + TSDB_CHECK_CODE(code, lino, _exit); + + if (pCompactor->aBlockIdx == NULL) { + pCompactor->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); + if (pCompactor->aBlockIdx == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + } else { + taosArrayClear(pCompactor->aBlockIdx); + } + + tMapDataReset(&pCompactor->mDataBlk); + + if (pCompactor->aSttBlk == NULL) { + pCompactor->aSttBlk = taosArrayInit(0, sizeof(SSttBlk)); + if (pCompactor->aSttBlk == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + } else { + taosArrayClear(pCompactor->aSttBlk); + } + _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); @@ -666,6 +701,27 @@ int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) { TSDB_CHECK_CODE(code, lino, _exit); } + if (pCompactor->bData.nRow > 0) { + // write again + } + + code = tsdbWriteSttBlk(pCompactor->pWriter, pCompactor->aSttBlk); + TSDB_CHECK_CODE(code, lino, _exit); + + if (pCompactor->mDataBlk.nItem > 0) { + SBlockIdx *pBlockIdx = taosArrayReserve(pCompactor->aBlockIdx, 1); + if (pBlockIdx == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + code = tsdbWriteDataBlk(pCompactor->pWriter, &pCompactor->mDataBlk, pBlockIdx); + TSDB_CHECK_CODE(code, lino, _exit); + } + + code = tsdbWriteBlockIdx(pCompactor->pWriter, pCompactor->aBlockIdx); + TSDB_CHECK_CODE(code, lino, _exit); + tsdbCloseCompactor(pCompactor); }