From e827d0bb7a5e58643f38bd21b6a28a2a5ff28c90 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 3 Aug 2022 08:50:36 +0000 Subject: [PATCH] more last file refact --- source/dnode/vnode/src/inc/tsdb.h | 13 + source/dnode/vnode/src/tsdb/tsdbCommit.c | 488 +++++++++--------- .../dnode/vnode/src/tsdb/tsdbReaderWriter.c | 34 +- source/dnode/vnode/src/tsdb/tsdbUtil.c | 2 + 4 files changed, 283 insertions(+), 254 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 8a6f473dc4..af256c0e19 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -233,6 +233,12 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_ SBlockIdx *pBlockIdx, SBlock *pBlock, int8_t cmprAlg); int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo); + +/* new */ +int32_t tsdbWriteDataBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock *pBlock, uint8_t **ppBuf1, + uint8_t **ppBuf2, int8_t cmprAlg); +int32_t tsdbWriteLastBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockL *pBlockL, uint8_t **ppBuf1, + uint8_t **ppBuf2, int8_t cmprAlg); // SDataFReader int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet); int32_t tsdbDataFReaderClose(SDataFReader **ppReader); @@ -244,6 +250,12 @@ int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBl int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2); int32_t tsdbReadBlockSma(SDataFReader *pReader, SBlock *pBlock, SArray *aColumnDataAgg, uint8_t **ppBuf); + +/* new */ +int32_t tsdbReadDataBlock(SDataFReader *pReader, SBlock *pBlock, SBlockData *pBlockData, uint8_t **ppBuf1, + uint8_t **ppBuf2); +int32_t tsdbReadLastBlock(SDataFReader *pReader, SBlockL *pBlockL, SBlockData *pBlockData, uint8_t **ppBuf1, + uint8_t **ppBuf2); // SDelFWriter int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb); int32_t tsdbDelFWriterClose(SDelFWriter **ppWriter, int8_t sync); @@ -457,6 +469,7 @@ struct SColData { }; struct SBlockData { + int64_t suid; int32_t nRow; int64_t *aUid; int64_t *aVersion; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 704aa0aab0..70892acb0a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -56,8 +56,8 @@ typedef struct { SBlockL *pBlockL; SBlockData bDatal; int32_t iRow; - SRowInfo *pRow; - SRowInfo row; + SRowInfo *pRowInfo; + SRowInfo rowInfo; } dReader; struct { SDataFWriter *pWriter; @@ -290,12 +290,46 @@ _err: return code; } +static int32_t tsdbCommitNextLastRow(SCommitter *pCommitter) { + int32_t code = 0; + + ASSERT(pCommitter->dReader.pReader); + ASSERT(pCommitter->dReader.pRowInfo); + + pCommitter->dReader.iRow++; + if (pCommitter->dReader.iRow < pCommitter->dReader.bDatal.nRow) { + pCommitter->dReader.pRowInfo->uid = pCommitter->dReader.bData.aUid[pCommitter->dReader.iRow]; + pCommitter->dReader.pRowInfo->row = tsdbRowFromBlockData(&pCommitter->dReader.bDatal, pCommitter->dReader.iRow); + } else { + pCommitter->dReader.iBlockL++; + if (pCommitter->dReader.iBlockL < taosArrayGetSize(pCommitter->dReader.aBlockL)) { + pCommitter->dReader.pBlockL = (SBlockL *)taosArrayGet(pCommitter->dReader.aBlockL, pCommitter->dReader.iBlockL); + code = tsdbReadLastBlock(pCommitter->dReader.pReader, pCommitter->dReader.pBlockL, &pCommitter->dReader.bDatal, + NULL, NULL); + if (code) goto _exit; + + pCommitter->dReader.iRow = 0; + pCommitter->dReader.pRowInfo->suid = pCommitter->dReader.pBlockL->suid; + pCommitter->dReader.pRowInfo->uid = pCommitter->dReader.bData.aUid[pCommitter->dReader.iRow]; + pCommitter->dReader.pRowInfo->row = tsdbRowFromBlockData(&pCommitter->dReader.bDatal, pCommitter->dReader.iRow); + } else { + pCommitter->dReader.pRowInfo = NULL; + } + } + +_exit: + return code; +} + static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { int32_t code = 0; STsdb *pTsdb = pCommitter->pTsdb; SDFileSet *pRSet = NULL; // memory + pCommitter->commitFid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision); + tsdbFidKeyRange(pCommitter->commitFid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey, + &pCommitter->maxKey); pCommitter->nextKey = TSKEY_MAX; // Reader @@ -325,25 +359,15 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { code = tsdbReadBlockL(pCommitter->dReader.pReader, pCommitter->dReader.aBlockL, NULL); if (code) goto _err; - pCommitter->dReader.iBlockL = 0; - if (pCommitter->dReader.iBlockL < taosArrayGetSize(pCommitter->dReader.aBlockL)) { - pCommitter->dReader.pBlockL = (SBlockL *)taosArrayGet(pCommitter->dReader.aBlockL, pCommitter->dReader.iBlockL); - - // TODO: code = tsdbReadBlockData(pCommitter->dReader.pReader, NULL, pBlockL, &pCommitter->dReader.bDatal, NULL, - // NULL); - if (code) goto _err; - - pCommitter->dReader.iRow = 0; - pCommitter->dReader.pRow = &pCommitter->dReader.row; - pCommitter->dReader.pRow->suid = pCommitter->dReader.pBlockL->suid; - pCommitter->dReader.pRow->uid = pCommitter->dReader.bDatal.aUid[pCommitter->dReader.iRow]; - pCommitter->dReader.pRow->row = tsdbRowFromBlockData(&pCommitter->dReader.bDatal, pCommitter->dReader.iRow); - } else { - pCommitter->dReader.pRow = NULL; - } + pCommitter->dReader.iBlockL = -1; + pCommitter->dReader.bDatal.nRow = 0; + pCommitter->dReader.iRow = -1; + pCommitter->dReader.pRowInfo = &pCommitter->dReader.rowInfo; + code = tsdbCommitNextLastRow(pCommitter); + if (code) goto _err; } else { pCommitter->dReader.pBlockIdx = NULL; - pCommitter->dReader.pRow = NULL; + pCommitter->dReader.pRowInfo = NULL; } // Writer @@ -463,6 +487,46 @@ _err: return code; } +static int32_t tsdbCommitDataBlock(SCommitter *pCommitter) { + int32_t code = 0; + SBlock block; + + ASSERT(pCommitter->dWriter.bData.nRow > 0); + + code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bData, &block, NULL, NULL, + pCommitter->cmprAlg); + if (code) goto _exit; + + code = tMapDataPutItem(&pCommitter->dWriter.mBlock, &block, tPutBlock); + if (code) goto _exit; + + tBlockDataClearData(&pCommitter->dWriter.bData); + +_exit: + return code; +} + +static int32_t tsdbCommitLastBlock(SCommitter *pCommitter) { + int32_t code = 0; + SBlockL blockL; + + ASSERT(pCommitter->dWriter.bDatal.nRow > 0); + + code = tsdbWriteLastBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, &blockL, NULL, NULL, + pCommitter->cmprAlg); + if (code) goto _exit; + + if (taosArrayPush(pCommitter->dWriter.aBlockL, &blockL) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + tBlockDataClearData(&pCommitter->dWriter.bDatal); + +_exit: + return code; +} + static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SBlock *pBlockMerge, TSDBKEY toKey, int8_t toDataOnly) { int32_t code = 0; @@ -675,22 +739,16 @@ _err: // return code; // } -static int32_t tsdbGetOvlpNRow(STbDataIter *pIter, SBlock *pBlock) { - int32_t nRow = 0; - TSDBROW *pRow; - TSDBKEY key; - int32_t c = 0; - STbDataIter iter = *pIter; +static int32_t tsdbGetNumOfRowsLessThan(STbDataIter *pIter, TSDBKEY key) { + int32_t nRow = 0; - iter.pRow = NULL; + STbDataIter iter = *pIter; while (true) { - pRow = tsdbTbDataIterGet(&iter); - + TSDBROW *pRow = tsdbTbDataIterGet(&iter); if (pRow == NULL) break; - key = TSDBROW_KEY(pRow); - c = tBlockCmprFn(&(SBlock){.maxKey = key, .minKey = key}, pBlock); - if (c == 0) { + int32_t c = tsdbKeyCmprFn(&TSDBROW_KEY(pRow), &key); + if (c < 0) { nRow++; tsdbTbDataIterNext(&iter); } else if (c > 0) { @@ -750,11 +808,106 @@ _err: return code; } +static int32_t tsdbMergeCommitLast(SCommitter *pCommitter, STbDataIter *pIter, int32_t *nRow) { + int32_t code = 0; + STbData *pTbData = pIter->pTbData; + TSDBROW *pRow = tsdbTbDataIterGet(pIter); + SRowInfo *pRowInfo = pCommitter->dReader.pRowInfo; + + while (pRow && pRowInfo) { + int32_t c = tsdbRowCmprFn(pRow, &pRowInfo->row); + if (c < 0) { + code = tBlockDataAppendRow(&pCommitter->dWriter.bData, pRow, NULL); + if (code) goto _err; + + tsdbTbDataIterNext(pIter); + pRow = tsdbTbDataIterGet(pIter); + if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) { + pRow = NULL; + } + } else if (c > 0) { + code = tBlockDataAppendRow(&pCommitter->dWriter.bData, &pRowInfo->row, NULL); + if (code) goto _err; + + pCommitter->dReader.iRow++; + if (pCommitter->dReader.iRow < pCommitter->dReader.bDatal.nRow) { + // todo + } else { + pCommitter->dReader.iBlockL++; + if (pCommitter->dReader.iBlockL < taosArrayGetSize(pCommitter->dReader.aBlockL)) { + // todo + } else { + // todo + } + } + + pRowInfo = pCommitter->dReader.pRowInfo; + if (pRowInfo && (pRowInfo->suid != pTbData->suid || pRowInfo->uid != pTbData->uid)) { + pRowInfo = NULL; + } + } else { + ASSERT(0); + } + + (*nRow)--; + ASSERT(*nRow >= 0); + if (pCommitter->dWriter.bData.nRow >= pCommitter->maxRow * 4 / 5) goto _write_block_data; + } + + while (pRow) { + code = tBlockDataAppendRow(&pCommitter->dWriter.bData, pRow, NULL); + if (code) goto _err; + + tsdbTbDataIterNext(pIter); + pRow = tsdbTbDataIterGet(pIter); + if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) { + pRow = NULL; + } + + (*nRow)--; + ASSERT(*nRow >= 0); + if (pCommitter->dWriter.bData.nRow >= pCommitter->maxRow * 4 / 5) goto _write_block_data; + } + + while (pRowInfo) { + code = tBlockDataAppendRow(&pCommitter->dWriter.bData, &pRowInfo->row, NULL); + if (code) goto _err; + + pCommitter->dReader.iRow++; + if (pCommitter->dReader.iRow < pCommitter->dReader.bDatal.nRow) { + // todo + } else { + pCommitter->dReader.iBlockL++; + if (pCommitter->dReader.iBlockL < taosArrayGetSize(pCommitter->dReader.aBlockL)) { + // todo + } else { + // todo + } + } + + pRowInfo = pCommitter->dReader.pRowInfo; + if (pRowInfo && (pRowInfo->suid != pTbData->suid || pRowInfo->uid != pTbData->uid)) { + pRowInfo = NULL; + } + + (*nRow)--; + ASSERT(*nRow >= 0); + if (pCommitter->dWriter.bData.nRow >= pCommitter->maxRow * 4 / 5) goto _write_block_data; + } + + SBlock block; +_write_block_data: + return code; + +_err: + return code; +} + static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) { int32_t code = 0; ASSERT(pCommitter->dReader.pBlockIdx == NULL || tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, pTbData) >= 0); - ASSERT(pCommitter->dReader.pRow == NULL || tTABLEIDCmprFn(pCommitter->dReader.pRow, pTbData) >= 0); + ASSERT(pCommitter->dReader.pRowInfo == NULL || tTABLEIDCmprFn(pCommitter->dReader.pRowInfo, pTbData) >= 0); // end last if need if (pTbData->suid == 0 || pTbData->suid != 0 /*todo*/) { @@ -816,7 +969,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) { pRow = NULL; } } else { // merge - int32_t nOvlp = tsdbGetOvlpNRow(pIter, pBlock); + int32_t nOvlp = tsdbGetNumOfRowsLessThan(pIter, pBlock->maxKey); ASSERT(nOvlp > 0); @@ -854,13 +1007,20 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) { } } - if (pRow) { - code = - tsdbCommitTableMemData(pCommitter, pIter, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0); - if (code) goto _err; + // merge with last + int32_t nRowLeft = tsdbGetNumOfRowsLessThan(pIter, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}); + if (pCommitter->dReader.pRowInfo) { + for (int32_t iRow = pCommitter->dReader.iRow; iRow < pCommitter->dReader.bDatal.nRow; iRow++) { + int64_t uid = pCommitter->dReader.bDatal.aUid[iRow]; + if (uid == pTbData->uid) { + nRowLeft++; + } + } + } - pRow = tsdbTbDataIterGet(pIter); - ASSERT(pRow == NULL || TSDBROW_TS(pRow) > pCommitter->maxKey); + while (nRowLeft) { + code = tsdbMergeCommitLast(pCommitter, pIter, &nRowLeft); + if (code) goto _err; } // end @@ -888,174 +1048,6 @@ _err: return code; } -#if 0 -static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) { - int32_t code = 0; - STbDataIter iter = {0}; - STbDataIter *pIter = &iter; - TSDBROW *pRow; - int32_t iBlock; - int32_t nBlock; - int64_t suid; - int64_t uid; - SBlockIdx *pBlockIdx = NULL; - - if (pTbData) { - tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = VERSION_MIN}, 0, pIter); - pRow = tsdbTbDataIterGet(pIter); - if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL; - - suid = pTbData->suid; - uid = pTbData->uid; - } else { - pIter = NULL; - pRow = NULL; - } - - if (pBlockIdx) { - code = tsdbReadBlock(pCommitter->dReader.pReader, pBlockIdx, &pCommitter->dReader.mBlock, NULL); - if (code) goto _err; - - nBlock = pCommitter->dReader.mBlock.nItem; - ASSERT(nBlock > 0); - - suid = pBlockIdx->suid; - uid = pBlockIdx->uid; - } else { - nBlock = 0; - } - - if (pRow == NULL && nBlock == 0) goto _exit; - - // start =========== - tMapDataReset(&pCommitter->dWriter.mBlock); - SBlock block; - SBlock *pBlock = █ - - iBlock = 0; - if (iBlock < nBlock) { - tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock); - } else { - pBlock = NULL; - } - - if (pRow) { - code = tsdbCommitterUpdateTableSchema(pCommitter, pTbData->suid, pTbData->uid, pTbData->maxSkmVer); - if (code) goto _err; - } - - // merge =========== - while (true) { - if (pRow == NULL && pBlock == NULL) break; - - if (pRow && pBlock) { - if (pBlock->last) { - code = tsdbMergeTableData(pCommitter, pIter, pBlock, - (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0); - if (code) goto _err; - - pRow = tsdbTbDataIterGet(pIter); - if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL; - iBlock++; - if (iBlock < nBlock) { - tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock); - } else { - pBlock = NULL; - } - - ASSERT(pRow == NULL && pBlock == NULL); - } else { - int32_t c = tBlockCmprFn(&(SBlock){.maxKey = TSDBROW_KEY(pRow), .minKey = TSDBROW_KEY(pRow)}, pBlock); - if (c > 0) { - // only disk data - code = tsdbCommitTableDiskData(pCommitter, pBlock, pBlockIdx); - if (code) goto _err; - - iBlock++; - if (iBlock < nBlock) { - tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock); - } else { - pBlock = NULL; - } - } else if (c < 0) { - // only memory data - code = tsdbCommitTableMemData(pCommitter, pIter, pBlock->minKey, 1); - if (code) goto _err; - - pRow = tsdbTbDataIterGet(pIter); - if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL; - } else { - // merge memory and disk - int32_t nOvlp = tsdbGetOvlpNRow(pIter, pBlock); - ASSERT(nOvlp); - if (pBlock->nRow + nOvlp <= pCommitter->maxRow && pBlock->nSubBlock < TSDB_MAX_SUBBLOCKS) { - code = tsdbMergeAsSubBlock(pCommitter, pIter, pBlock); - if (code) goto _err; - } else { - TSDBKEY toKey = {.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}; - int8_t toDataOnly = 0; - - if (iBlock < nBlock - 1) { - toDataOnly = 1; - - SBlock nextBlock = {0}; - tBlockReset(&nextBlock); - tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock + 1, &nextBlock, tGetBlock); - toKey = nextBlock.minKey; - } - - code = tsdbMergeTableData(pCommitter, pIter, pBlock, toKey, toDataOnly); - if (code) goto _err; - } - - pRow = tsdbTbDataIterGet(pIter); - if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL; - iBlock++; - if (iBlock < nBlock) { - tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock); - } else { - pBlock = NULL; - } - } - } - } else if (pBlock) { - code = tsdbCommitTableDiskData(pCommitter, pBlock, pBlockIdx); - if (code) goto _err; - - iBlock++; - if (iBlock < nBlock) { - tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock); - } else { - pBlock = NULL; - } - } else { - code = - tsdbCommitTableMemData(pCommitter, pIter, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0); - if (code) goto _err; - - pRow = tsdbTbDataIterGet(pIter); - if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL; - ASSERT(pRow == NULL); - } - } - - // end ===================== - code = tsdbCommitTableDataEnd(pCommitter, suid, uid); - if (code) goto _err; - -_exit: - if (pIter) { - pRow = tsdbTbDataIterGet(pIter); - if (pRow) pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow)); - } - return code; - -_err: - tsdbError("vgId:%d, tsdb commit table data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); - return code; -} -#endif - static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) { int32_t code = 0; @@ -1122,52 +1114,53 @@ static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) { } // last - SBlockL blockL; while (true) { - if (pCommitter->dReader.pRow == NULL || tTABLEIDCmprFn(pCommitter->dReader.pRow, &toTable) >= 0) break; + if (pCommitter->dReader.pRowInfo == NULL || tTABLEIDCmprFn(pCommitter->dReader.pRowInfo, &toTable) >= 0) break; - // check if same suid - if (pCommitter->dReader.pRow->suid == 0) { - if (pCommitter->dReader.pRow->uid != 0 /*todo*/) { - // code = tsdbCommitBlockDataL(pCommitter); + // commit if not same schema + if (pCommitter->dWriter.bDatal.nRow > 0) { + if (pCommitter->dWriter.bDatal.suid != pCommitter->dReader.pRowInfo->suid || + pCommitter->dWriter.bDatal.suid == 0) { + code = tsdbCommitLastBlock(pCommitter); if (code) goto _err; } - } else if (pCommitter->dReader.pRow->suid != 0 /*todo*/) { - // code = tsdbCommitBlockDataL(pCommitter); + } + + if (pCommitter->dWriter.bDatal.nRow == 0) { + code = tsdbCommitterUpdateTableSchema(pCommitter, pCommitter->dReader.pRowInfo->suid, + pCommitter->dReader.pRowInfo->suid, 1 /*TODO*/); + if (code) goto _err; + + pCommitter->dWriter.bDatal.suid = pCommitter->dReader.pRowInfo->suid; + code = tBlockDataSetSchema(&pCommitter->dWriter.bDatal, pCommitter->skmTable.pTSchema); if (code) goto _err; } - // append - code = tBlockDataAppendRow(&pCommitter->dWriter.bDatal, &pCommitter->dReader.pRow->row, NULL); - if (code) goto _err; + // check if it can make sure that one table data in one block + int64_t uid = pCommitter->dReader.pRowInfo->uid; + int32_t nRow = 0; + for (int32_t iRow = pCommitter->dReader.iRow; + (iRow < pCommitter->dReader.bDatal.nRow) && (pCommitter->dReader.bDatal.aUid[iRow] == uid); iRow++) { + nRow++; + } - // next - pCommitter->dReader.iRow++; - if (pCommitter->dReader.iRow < pCommitter->dReader.bDatal.nRow) { - pCommitter->dReader.pRow->uid = pCommitter->dReader.bDatal.aUid[pCommitter->dReader.iRow]; - pCommitter->dReader.pRow->row = tsdbRowFromBlockData(&pCommitter->dReader.bDatal, pCommitter->dReader.iRow); - } else { - pCommitter->dReader.iBlockL++; - if (pCommitter->dReader.iBlockL < taosArrayGetSize(pCommitter->dReader.aBlockL)) { - pCommitter->dReader.pBlockL = (SBlockL *)taosArrayGet(pCommitter->dReader.aBlockL, pCommitter->dReader.iBlockL); + ASSERT(nRow > 0 && nRow < pCommitter->minRow); - // TODO: code = tsdbReadBlockData(pCommitter->dReader.pReader, NULL, pBlockL, &pCommitter->dReader.bDatal, NULL, - // NULL); - if (code) goto _err; + if (pCommitter->dWriter.bDatal.nRow + nRow > pCommitter->maxRow) { + ASSERT(pCommitter->dWriter.bDatal.nRow > 0); - pCommitter->dReader.iRow = 0; - pCommitter->dReader.pRow->suid = pCommitter->dReader.pBlockL->suid; - pCommitter->dReader.pRow->uid = pCommitter->dReader.bDatal.aUid[pCommitter->dReader.iRow]; - pCommitter->dReader.pRow->row = tsdbRowFromBlockData(&pCommitter->dReader.bDatal, pCommitter->dReader.iRow); - } else { - pCommitter->dReader.pRow = NULL; - } + code = tsdbCommitLastBlock(pCommitter); + if (code) goto _err; } - // write - if (pCommitter->dWriter.bDatal.nRow >= pCommitter->maxRow) { - // code = tsdbCommitBlockDataL(pCommitter); + while (nRow > 0) { + code = tBlockDataAppendRow(&pCommitter->dWriter.bDatal, &pCommitter->dReader.pRowInfo->row, NULL); + if (code) goto _err; + + code = tsdbCommitNextLastRow(pCommitter); if (code) goto _err; + + nRow--; } } @@ -1192,7 +1185,7 @@ static int32_t tsdbCommitFileData(SCommitter *pCommitter) { STbData *pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData); // move commit until current (suid, uid) - code = tsdbMoveCommitData(pCommitter, (TABLEID){.suid = pTbData->suid, .uid = pTbData->uid}); + code = tsdbMoveCommitData(pCommitter, *(TABLEID *)pTbData); if (code) goto _err; // commit current table data @@ -1204,7 +1197,7 @@ static int32_t tsdbCommitFileData(SCommitter *pCommitter) { if (code) goto _err; if (pCommitter->dWriter.bDatal.nRow > 0) { - // code = tsdbCommitBlockDataL(pCommitter); + code = tsdbCommitLastBlock(pCommitter); if (code) goto _err; } @@ -1261,15 +1254,15 @@ static int32_t tsdbCommitDataStart(SCommitter *pCommitter) { goto _exit; } + code = tBlockDataInit(&pCommitter->dReader.bData); + if (code) goto _exit; + pCommitter->dReader.aBlockL = taosArrayInit(0, sizeof(SBlockL)); if (pCommitter->dReader.aBlockL == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } - code = tBlockDataInit(&pCommitter->dReader.bData); - if (code) goto _exit; - code = tBlockDataInit(&pCommitter->dReader.bDatal); if (code) goto _exit; @@ -1299,9 +1292,9 @@ _exit: static void tsdbCommitDataEnd(SCommitter *pCommitter) { // Reader taosArrayDestroy(pCommitter->dReader.aBlockIdx); - taosArrayDestroy(pCommitter->dReader.aBlockL); tMapDataClear(&pCommitter->dReader.mBlock); tBlockDataClear(&pCommitter->dReader.bData, 1); + taosArrayDestroy(pCommitter->dReader.aBlockL); tBlockDataClear(&pCommitter->dReader.bDatal, 1); // Writer @@ -1329,9 +1322,6 @@ static int32_t tsdbCommitData(SCommitter *pCommitter) { // impl ==================== pCommitter->nextKey = pMemTable->minKey; while (pCommitter->nextKey < TSKEY_MAX) { - pCommitter->commitFid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision); - tsdbFidKeyRange(pCommitter->commitFid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey, - &pCommitter->maxKey); code = tsdbCommitFileData(pCommitter); if (code) goto _err; } diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 22615b9508..79929c44a0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -543,9 +543,13 @@ int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx, uint8_t **ppB uint32_t delimiter; SBlockIdx blockIdx; - if (!ppBuf) ppBuf = &pBuf; + taosArrayClear(aBlockIdx); + if (size == 0) { + goto _exit; + } // alloc + if (!ppBuf) ppBuf = &pBuf; code = tRealloc(ppBuf, size); if (code) goto _err; @@ -576,7 +580,6 @@ int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx, uint8_t **ppB n = tGetU32(*ppBuf + n, &delimiter); ASSERT(delimiter == TSDB_FILE_DLMT); - taosArrayClear(aBlockIdx); while (n < size - sizeof(TSCKSUM)) { n += tGetBlockIdx(*ppBuf + n, &blockIdx); @@ -588,6 +591,7 @@ int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx, uint8_t **ppB ASSERT(n + sizeof(TSCKSUM) == size); +_exit: tFree(pBuf); return code; @@ -606,9 +610,13 @@ int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL, uint8_t **ppBuf) uint8_t *pBuf = NULL; SBlockL blockl; - if (!ppBuf) ppBuf = &pBuf; + taosArrayClear(aBlockL); + if (size == 0) { + goto _exit; + } // alloc + if (!ppBuf) ppBuf = &pBuf; code = tRealloc(ppBuf, size); if (code) goto _err; @@ -639,7 +647,6 @@ int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL, uint8_t **ppBuf) n = tGetU32(*ppBuf + n, &delimiter); ASSERT(delimiter == TSDB_FILE_DLMT); - taosArrayClear(aBlockL); while (n < size - sizeof(TSCKSUM)) { n += tGetBlockL(*ppBuf + n, &blockl); @@ -651,11 +658,13 @@ int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL, uint8_t **ppBuf) ASSERT(n + sizeof(TSCKSUM) == size); +_exit: tFree(pBuf); return code; _err: tsdbError("vgId:%d read blockl failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); + tFree(pBuf); return code; } @@ -1562,7 +1571,11 @@ int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx, uint8_t **pp int64_t size; int64_t n; - if (!ppBuf) ppBuf = &pBuf; + // check + if (taosArrayGetSize(aBlockIdx) == 0) { + pHeadFile->offset = pHeadFile->size; + goto _exit; + } // prepare size = tPutU32(NULL, TSDB_FILE_DLMT); @@ -1572,6 +1585,7 @@ int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx, uint8_t **pp size += sizeof(TSCKSUM); // alloc + if (!ppBuf) ppBuf = &pBuf; code = tRealloc(ppBuf, size); if (code) goto _err; @@ -1596,6 +1610,7 @@ int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx, uint8_t **pp pHeadFile->offset = pHeadFile->size; pHeadFile->size += size; +_exit: tFree(pBuf); return code; @@ -1662,6 +1677,12 @@ int32_t tsdbWriteBlockL(SDataFWriter *pWriter, SArray *aBlockL, uint8_t **ppBuf) int64_t size; int64_t n; + // check + if (taosArrayGetSize(aBlockL) == 0) { + pHeadFile->loffset = pHeadFile->size; + goto _exit; + } + // size size = sizeof(uint32_t); for (int32_t iBlockL = 0; iBlockL < taosArrayGetSize(aBlockL); iBlockL++) { @@ -1695,10 +1716,13 @@ int32_t tsdbWriteBlockL(SDataFWriter *pWriter, SArray *aBlockL, uint8_t **ppBuf) pHeadFile->loffset = pHeadFile->size; pHeadFile->size += size; +_exit: + tFree(pBuf); return code; _err: tsdbError("vgId:%d tsdb write blockl failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); + tFree(pBuf); return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 4e0d1b2402..205d8b29dc 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -1122,6 +1122,7 @@ int32_t tBlockDataInit(SBlockData *pBlockData) { int32_t code = 0; pBlockData->nRow = 0; + pBlockData->aUid = NULL; pBlockData->aVersion = NULL; pBlockData->aTSKEY = NULL; pBlockData->aIdx = taosArrayInit(0, sizeof(int32_t)); @@ -1146,6 +1147,7 @@ void tBlockDataReset(SBlockData *pBlockData) { } void tBlockDataClear(SBlockData *pBlockData, int8_t deepClear) { + tFree((uint8_t *)pBlockData->aUid); tFree((uint8_t *)pBlockData->aVersion); tFree((uint8_t *)pBlockData->aTSKEY); taosArrayDestroy(pBlockData->aIdx); -- GitLab