From b25e55cf8142a51692eecc2bb86512ecbf3165ee Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 14 Jun 2022 06:57:07 +0000 Subject: [PATCH] more work --- source/dnode/vnode/src/inc/tsdb.h | 85 ++++++++----------- source/dnode/vnode/src/tsdb/tsdbCommit.c | 100 +++++++---------------- source/dnode/vnode/src/tsdb/tsdbUtil.c | 18 ++++ 3 files changed, 82 insertions(+), 121 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index bd6d6ff856..229cac348f 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -48,6 +48,7 @@ typedef struct SOffset SOffset; typedef struct SMapData SMapData; typedef struct SColData SColData; typedef struct SColDataBlock SColDataBlock; +typedef struct SBlockSMA SBlockSMA; // tsdbMemTable ============================================================================================== @@ -86,9 +87,24 @@ int32_t tsdbFSEnd(STsdbFS *pFS, int8_t rollback); // SDataFWriter typedef struct SDataFWriter SDataFWriter; +int32_t tsdbDataFWriterOpen(SDataFWriter *pWriter, STsdb *pTsdb, SDFileSet *pSet); +int32_t tsdbDataFWriterClose(SDataFWriter *pWriter); +int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf); +int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf, int64_t *rOffset, int64_t *rSize); +int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SColDataBlock *pBlockData, uint8_t **ppBuf, int64_t *rOffset, + int64_t *rSize); +int32_t tsdbWriteBlockSMA(SDataFWriter *pWriter, SBlockSMA *pBlockSMA, int64_t *rOffset, int64_t *rSize); + // SDataFReader typedef struct SDataFReader SDataFReader; +int32_t tsdbDataFReaderOpen(SDataFReader *pReader, STsdb *pTsdb, SDFileSet *pSet); +int32_t tsdbDataFReaderClose(SDataFReader *pReader); +int32_t tsdbReadBlockIdx(SDataFReader *pReader, SMapData *pMapData, uint8_t **ppBuf); +int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMapData, uint8_t **ppBuf); +int32_t tsdbReadBlockData(SDataFReader *pReader, SBlock *pBlock, SColDataBlock *pBlockData, uint8_t **ppBuf); +int32_t tsdbReadBlockSMA(SDataFReader *pReader, SBlockSMA *pBlkSMA); + // SDelFWriter typedef struct SDelFWriter SDelFWriter; @@ -125,29 +141,9 @@ typedef struct SAggrBlkCol SAggrBlkCol; typedef struct SBlockData SBlockData; typedef struct SReadH SReadH; -typedef struct SDFileSetReader SDFileSetReader; -typedef struct SDFileSetWriter SDFileSetWriter; - -// SDFileSetWriter -// int32_t tsdbDFileSetWriterOpen(SDFileSetWriter *pWriter, STsdb *pTsdb, SDFileSet *pSet); -// int32_t tsdbDFileSetWriterClose(SDFileSetWriter *pWriter, int8_t sync); -// int32_t tsdbWriteBlockData(SDFileSetWriter *pWriter, SDataCols *pDataCols, SBlock *pBlock); -// int32_t tsdbWriteSBlockInfo(SDFileSetWriter *pWriter, SBlockInfo *pBlockInfo, SBlockIdx *pBlockIdx); -// int32_t tsdbWriteSBlockIdx(SDFileSetWriter *pWriter, SBlockIdx *pBlockIdx); - -// SDFileSetReader -// int32_t tsdbDFileSetReaderOpen(SDFileSetReader *pReader, STsdb *pTsdb, SDFileSet *pSet); -// int32_t tsdbDFileSetReaderClose(SDFileSetReader *pReader); -// int32_t tsdbLoadSBlockIdx(SDFileSetReader *pReader, SArray *pArray); -// int32_t tsdbLoadSBlockInfo(SDFileSetReader *pReader, SBlockIdx *pBlockIdx, SBlockInfo *pBlockInfo); -// int32_t tsdbLoadSBlockStatis(SDFileSetReader *pReader, SBlock *pBlock, SBlockStatis *pBlockStatis); - -// SDelFWriter - -// SDelFReader - // tsdbUtil.c ============================================================================================== int32_t tsdbKeyFid(TSKEY key, int32_t minutes, int8_t precision); +void tsdbFidKeyRange(int32_t fid, int32_t minutes, int8_t precision, TSKEY *minKey, TSKEY *maxKey); int32_t tsdbRealloc(uint8_t **ppBuf, int64_t size); void tsdbFree(uint8_t *pBuf); @@ -284,7 +280,7 @@ struct TSDBROW { }; }; -struct SBlockIdxItem { +struct SBlockIdx { int64_t suid; int64_t uid; TSDBKEY minKey; @@ -295,15 +291,6 @@ struct SBlockIdxItem { int64_t size; }; -struct SBlockIdx { - int64_t suid; - int64_t uid; - uint32_t delimiter; - SOffset offset; - uint32_t nData; - uint8_t *pData; -}; - typedef enum { TSDB_SBLK_VER_0 = 0, TSDB_SBLK_VER_MAX, @@ -365,26 +352,6 @@ struct SBlockData { typedef void SAggrBlkData; // SBlockCol cols[]; -static FORCE_INLINE int TSDB_KEY_FID(TSKEY key, int32_t minutes, int8_t precision) { - if (key < 0) { - return (int)((key + 1) / tsTickPerMin[precision] / minutes - 1); - } else { - return (int)((key / tsTickPerMin[precision] / minutes)); - } -} - -static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) { - if (fid >= pRtn->maxFid) { - return 0; - } else if (fid >= pRtn->midFid) { - return 1; - } else if (fid >= pRtn->minFid) { - return 2; - } else { - return -1; - } -} - // ================== TSDB global config extern bool tsdbForceKeepFile; @@ -457,6 +424,7 @@ struct SMapData { uint8_t *pOfst; uint32_t nData; uint8_t *pData; + uint8_t *pBuf; }; struct SColData { @@ -474,6 +442,21 @@ struct SColDataBlock { SColData *aColData; }; +typedef struct { + int16_t colId; + int16_t maxIndex; + int16_t minIndex; + int16_t numOfNull; + int64_t sum; + int64_t max; + int64_t min; +} SColSMA; + +struct SBlockSMA { + int32_t nCol; + SColSMA *aColSMA; +}; + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 63cccc420c..bc126650c3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -15,9 +15,7 @@ #include "tsdb.h" -typedef struct SCommitter SCommitter; - -struct SCommitter { +typedef struct { STsdb *pTsdb; uint8_t *pBuf1; uint8_t *pBuf2; @@ -29,15 +27,15 @@ struct SCommitter { int8_t precision; int32_t minRow; int32_t maxRow; - TSKEY nextCommitKey; // commit file data + TSKEY nextKey; int32_t commitFid; TSKEY minKey; TSKEY maxKey; SDFileSetReader *pReader; + SMapData oBlockIdx; // SMapData, read from reader SDFileSetWriter *pWriter; - SMapData oBlockIdx; - SMapData nBlockIdx; + SMapData nBlockIdx; // SMapData, build by committer // commit table data STbDataIter iter; STbDataIter *pIter; @@ -57,7 +55,7 @@ struct SCommitter { SDelData delDataNew; SDelIdxItem delIdxItem; /* commit cache */ -}; +} SCommitter; static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter); static int32_t tsdbCommitData(SCommitter *pCommitter); @@ -81,8 +79,15 @@ _err: int32_t tsdbCommit(STsdb *pTsdb) { int32_t code = 0; - SCommitter commith = {0}; - int fid; + SCommitter commith; + SMemTable *pMemTable = pTsdb->mem; + + // check + if (pMemTable->nRow == 0 && pMemTable->nDel == 0) { // TODO + pTsdb->mem = NULL; + tsdbMemTableDestroy(pMemTable); + goto _exit; + } // start commit code = tsdbStartCommit(pTsdb, &commith); @@ -112,9 +117,11 @@ int32_t tsdbCommit(STsdb *pTsdb) { goto _err; } +_exit: return code; _err: + tsdbEndCommit(&commith, code); tsdbError("vgId:%d, failed to commit since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); return code; } @@ -122,6 +129,7 @@ _err: static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) { int32_t code = 0; + memset(pCommitter, 0, sizeof(*pCommitter)); ASSERT(pTsdb->mem && pTsdb->imem == NULL); // lock(); pTsdb->imem = pTsdb->mem; @@ -133,36 +141,24 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) { return code; } -static int32_t tsdbCommitDataStart(SCommitter *pCommitter); -static int32_t tsdbCommitDataImpl(SCommitter *pCommitter); -static int32_t tsdbCommitDataEnd(SCommitter *pCommitter); - static int32_t tsdbCommitData(SCommitter *pCommitter) { int32_t code = 0; STsdb *pTsdb = pCommitter->pTsdb; SMemTable *pMemTable = pTsdb->imem; - // no data, just return + // check if (pMemTable->nRow == 0) { goto _exit; } - // start - code = tsdbCommitDataStart(pCommitter); - if (code) { - goto _err; - } - - // commit - code = tsdbCommitDataImpl(pCommitter); - if (code) { - goto _err; - } - - // end - code = tsdbCommitDataEnd(pCommitter); - if (code) { - goto _err; + // loop + pCommitter->nextKey = pMemTable->minKey.ts; + while (pCommitter->nextKey < TSKEY_MAX) { + pCommitter->commitFid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision); + tsdbFidKeyRange(pCommitter->commitFid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey, + &pCommitter->maxKey); + code = tsdbCommitFileData(pCommitter); + if (code) goto _err; } _exit: @@ -359,40 +355,6 @@ static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) { return code; } -static int32_t tsdbCommitDataStart(SCommitter *pCommitter) { - int32_t code = 0; - STsdb *pTsdb = pCommitter->pTsdb; - SMemTable *pMemTable = pTsdb->imem; - - pCommitter->nextCommitKey = pMemTable->minKey.ts; - - return code; -} - -static int32_t tsdbCommitFileData(SCommitter *pCommitter); - -static int32_t tsdbCommitDataImpl(SCommitter *pCommitter) { - int32_t code = 0; - - while (pCommitter->nextCommitKey < TSKEY_MAX) { - pCommitter->commitFid = tsdbKeyFid(pCommitter->nextCommitKey, pCommitter->minutes, pCommitter->precision); - code = tsdbCommitFileData(pCommitter); - if (code) goto _err; - } - -_exit: - return code; - -_err: - return code; -} - -static int32_t tsdbCommitDataEnd(SCommitter *pCommitter) { - int32_t code = 0; - // TODO - return code; -} - static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter); static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter); static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter); @@ -430,10 +392,11 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { SDFileSet *pRSet = NULL; // TODO SDFileSet *pWSet = NULL; // TODO + // memory + tMapDataReset(&pCommitter->oBlockIdx); + tMapDataReset(&pCommitter->nBlockIdx); + // load old - pCommitter->oBlockIdx.nItem = 0; - pCommitter->oBlockIdx.flag = 0; - pCommitter->oBlockIdx.nData = 0; if (pRSet) { code = tsdbDFileSetReaderOpen(&pCommitter->pReader, pTsdb, pRSet); if (code) goto _err; @@ -443,9 +406,6 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { } // create new - pCommitter->nBlockIdx.nItem = 0; - pCommitter->nBlockIdx.flag = 0; - pCommitter->nBlockIdx.nData = 0; code = tsdbDFileSetWriterOpen(&pCommitter->pWriter, pTsdb, pWSet); if (code) goto _err; diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 644c93c7f9..dc599a6e8e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -140,6 +140,7 @@ void tMapDataReset(SMapData *pMapData) { void tMapDataClear(SMapData *pMapData) { tsdbFree(pMapData->pOfst); tsdbFree(pMapData->pData); + tsdbFree(pMapData->pBuf); } int32_t tMapDataPutItem(SMapData *pMapData, void *pItem, int32_t (*tPutItemFn)(uint8_t *, void *)) { @@ -741,6 +742,23 @@ int32_t tsdbKeyFid(TSKEY key, int32_t minutes, int8_t precision) { } } +void tsdbFidKeyRange(int32_t fid, int32_t minutes, int8_t precision, TSKEY *minKey, TSKEY *maxKey) { + *minKey = fid * minutes * tsTickPerMin[precision]; + *maxKey = *minKey + minutes * tsTickPerMin[precision] - 1; +} + +// int tsdFidLevel(int fid, TSKEY now, minute) { +// if (fid >= pRtn->maxFid) { +// return 0; +// } else if (fid >= pRtn->midFid) { +// return 1; +// } else if (fid >= pRtn->minFid) { +// return 2; +// } else { +// return -1; +// } +// } + // TSDBROW ====================================================== void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal) { // TODO -- GitLab