diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index c0326a6446d2c97ae95348078a0a6ae403298c86..859c0c31cee9bcd32ea4615017ae65db236bbd80 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -31,6 +31,9 @@ extern "C" { #define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSDB ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0) #define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TSDB ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0) // clang-format on + +#define TSDB_MAX_SUBBLOCKS 8 + typedef struct TSDBROW TSDBROW; typedef struct TSDBKEY TSDBKEY; typedef struct TABLEID TABLEID; @@ -97,8 +100,8 @@ int32_t tsdbDataFWriterClose(SDataFWriter *pWriter, int8_t sync); int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter, uint8_t **ppBuf); int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf); int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf, SBlockIdx *pBlockIdx); -int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SColDataBlock *pBlockData, uint8_t **ppBuf, int64_t *rOffset, - int64_t *rSize); +int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_t **ppBuf, SBlockIdx *pBlockIdx, + SBlock *pBlock); int32_t tsdbWriteBlockSMA(SDataFWriter *pWriter, SBlockSMA *pBlockSMA, int64_t *rOffset, int64_t *rSize); // SDataFReader @@ -153,10 +156,16 @@ int32_t tPutBlockIdx(uint8_t *p, void *ph); int32_t tGetBlockIdx(uint8_t *p, void *ph); // SBlock +#define BLOCK_INIT_VAL ((SBlock){}) + int32_t tPutBlock(uint8_t *p, void *ph); int32_t tGetBlock(uint8_t *p, void *ph); int32_t tBlockCmprFn(const void *p1, const void *p2); +// SBlockData +void tsdbBlockDataReset(SBlockData *pBlockData); +int32_t tsdbBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema); + // SDelIdx int32_t tPutDelIdx(uint8_t *p, void *ph); int32_t tGetDelIdx(uint8_t *p, void *ph); @@ -293,14 +302,21 @@ struct SBlockInfo { int64_t smaSize; }; +typedef struct { + int64_t offset; + int64_t size; +} SSubBlock; + struct SBlock { - TSDBKEY minKey; - TSDBKEY maxKey; - int64_t minVersion; - int64_t maxVersion; - int32_t nRows; - int8_t nBlockInfo; - SBlockInfo blockInfos[]; + TSDBKEY minKey; + TSDBKEY maxKey; + int64_t minVersion; + int64_t maxVersion; + int32_t nRows; + int8_t last; + int8_t hasDup; + int8_t nSubBlock; + SSubBlock sBlocks[TSDB_MAX_SUBBLOCKS]; }; struct SBlockCol { @@ -322,9 +338,7 @@ struct SAggrBlkCol { }; struct SBlockData { - int32_t delimiter; // For recovery usage - int32_t numOfCols; // For recovery usage - uint64_t uid; // For recovery usage + int32_t nRow; SBlockCol cols[]; }; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 678cf34f9c79de62a3c824a056cea557afd6816e..b89baf42882abfff864a364cdf3d735285c42b2a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -348,9 +348,60 @@ _err: #define ROW_END(pRow, maxKey) (((pRow) == NULL) || ((pRow)->pTSRow->ts > (maxKey))) -static int32_t tsdbMergeCommit(SCommitter *pCommitter, STbDataIter *pIter, SBlock *pBlock) { - int32_t code = 0; - // TODO +static int32_t tsdbMergeCommit(SCommitter *pCommitter, SBlockIdx *pBlockIdx, STbDataIter *pIter, SBlock *pBlock) { + int32_t code = 0; + TSDBROW *pRow; + SBlock block = BLOCK_INIT_VAL; + SBlockData bData; + + if (pBlock == NULL) { + while (true) { + pRow = tsdbTbDataIterGet(pIter); + + if (pRow == NULL || pRow->pTSRow->ts > pCommitter->maxKey) { + if (bData.nRow == 0) { + break; + } else { + goto _write_block_data; + } + } + + code = tsdbBlockDataAppendRow(&bData, pRow, NULL /*TODO*/); + if (code) goto _err; + + if (bData.nRow >= pCommitter->maxRow * 4 / 5) { + goto _write_block_data; + } else { + continue; + } + + _write_block_data: + block.last = (bData.nRow > pCommitter->minRow) ? 0 : 1; + code = tsdbWriteBlockData(pCommitter->pWriter, &bData, NULL, pBlockIdx, &block); + if (code) goto _err; + + code = tMapDataPutItem(&pCommitter->nBlock, &block, tPutBlock); + if (code) goto _err; + + // reset block and bdata + block = BLOCK_INIT_VAL; + tsdbBlockDataReset(&bData); + } + } else if (pBlock->last) { + // 1. read last block data + // 2. loop to merge memory data and last block data to write to .data file or .last file + } else { + // while (true) { + // pRow = tsdbTbDataIterGet(pIter); + + // if (pRow == NULL) /* code */ + // } + } + + return code; + +_err: + tsdbError("vgId:%d merge commit failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); return code; } @@ -390,7 +441,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl pRow = tsdbTbDataIterGet(pIter); while (!ROW_END(pRow, pCommitter->maxKey) && iBlock < nBlock) { tMapDataGetItemByIdx(&pCommitter->oBlock, iBlock, pBlock, tGetBlock); - code = tsdbMergeCommit(pCommitter, pIter, pBlock); + code = tsdbMergeCommit(pCommitter, &blockIdx, pIter, pBlock); if (code) goto _err; pRow = tsdbTbDataIterGet(pIter); @@ -400,7 +451,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl // mem pRow = tsdbTbDataIterGet(pIter); while (!ROW_END(pRow, pCommitter->maxKey)) { - code = tsdbMergeCommit(pCommitter, pIter, NULL); + code = tsdbMergeCommit(pCommitter, &blockIdx, pIter, NULL); if (code) goto _err; pRow = tsdbTbDataIterGet(pIter); @@ -410,7 +461,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl while (iBlock < nBlock) { tMapDataGetItemByIdx(&pCommitter->oBlock, iBlock, pBlock, tGetBlock); - code = tsdbMergeCommit(pCommitter, NULL, pBlock); + code = tsdbMergeCommit(pCommitter, &blockIdx, NULL, pBlock); if (code) goto _err; iBlock++; diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index a2edebaf3714f429607ed30c8e7504a76ef2f912..c7c1534aec23a8cfb5d4f94da595ca47c812fdc8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -720,8 +720,8 @@ _err: return code; } -int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SColDataBlock *pBlockData, uint8_t **ppBuf, int64_t *rOffset, - int64_t *rSize) { +int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_t **ppBuf, SBlockIdx *pBlockIdx, + SBlock *pBlock) { int32_t code = 0; // TODO return code; diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 52295ea0312ea8f9b0ec15024bdfb2b9cdbb50ac..58afb56dca690f15980a7bcb82f4c314112b5f71 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -562,5 +562,12 @@ int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SAr taosArrayDestroy(aSkyline2); } + return code; +} + +// SBlockData ====================================================== +int32_t tsdbBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema) { + int32_t code = 0; + // TODO return code; } \ No newline at end of file