From b26006d3414064830a9e1271fd22672c49e16fd2 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 15 Jan 2021 23:31:01 +0800 Subject: [PATCH] refact tsdb commit --- src/tsdb/src/tsdbCommit.c | 41 +++++++++++++++++++++------------------ 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 8300cfbdca..ec633c64bd 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -739,12 +739,6 @@ static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCo ASSERT(rowsToWrite > 0 && rowsToWrite <= pCfg->maxRowsPerFileBlock); ASSERT((!isLast) || rowsToWrite < pCfg->minRowsPerFileBlock); - // Seek file - offset = tsdbSeekDFile(pDFile, 0, SEEK_END); - if (offset < 0) { - return -1; - } - // Make buffer space if (tsdbMakeRoom((void **)(&TSDB_COMMIT_BUF(pCommith)), TSDB_BLOCK_STATIS_SIZE(pDataCols->numOfCols)) < 0) { return -1; @@ -842,7 +836,7 @@ static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCo tsdbUpdateDFileMagic(pDFile, POINTER_SHIFT(pBlockData, tsize - sizeof(TSCKSUM))); // Write the whole block to file - if (tsdbWriteDFile(pDFile, (void *)pBlockData, lsize) < lsize) { + if (tsdbAppendDFile(pDFile, (void *)pBlockData, lsize, &offset) < lsize) { return -1; } @@ -1005,8 +999,7 @@ static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLi if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, isLast, true) < 0) return -1; - if (taosArrayPush(pCommith->aSupBlk, (void *)(&block)) == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) { return -1; } } @@ -1076,7 +1069,7 @@ static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) { supBlock.numOfRows = pBlock->numOfRows + mInfo.rowsInserted - mInfo.rowsDeleteSucceed; supBlock.offset = taosArrayGetSize(pCommith->aSubBlk) * sizeof(SBlock); - if (tsdbCommitAddBlock(pCommith, &supBlock, subBlocks, pBlock->numOfSubBlocks + 1) < 0) return -1; + if (tsdbCommitAddBlock(pCommith, &supBlock, subBlocks, supBlock.numOfSubBlocks) < 0) return -1; } else { if (tsdbLoadBlockData(&(pCommith->readh), pBlock, NULL) < 0) return -1; if (tsdbMergeBlockData(pCommith, pIter, pCommith->readh.pDCols[0], keyLimit, bidx == (nBlocks - 1)) < 0) return -1; @@ -1087,16 +1080,28 @@ static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) { static int tsdbMoveBlock(SCommitH *pCommith, int bidx) { SBlock *pBlock = pCommith->readh.pBlkInfo->blocks + bidx; - SDFile *pCommitF = (pBlock->last) ? TSDB_COMMIT_LAST_FILE(pCommith) : TSDB_COMMIT_DATA_FILE(pCommith); - // SDFile *pReadF = (pBlock->last) ? TSDB_READ_LAST_FILE(&(pCommith->readh)) : TSDB_READ_DATA_FILE(&(pCommith->readh)); + SDFile *pDFile; SBlock block; + bool isSameFile; + + ASSERT(pBlock->numOfSubBlocks > 0); - if ((pBlock->last && pCommith->isLFileSame) || ((!pBlock->last) && pCommith->isDFileSame)) { + if (pBlock->last) { + pDFile = TSDB_COMMIT_LAST_FILE(pCommith); + isSameFile = pCommith->isLFileSame; + } else { + pDFile = TSDB_COMMIT_DATA_FILE(pCommith); + isSameFile = pCommith->isDFileSame; + } + + if (isSameFile) { if (pBlock->numOfSubBlocks == 1) { - if (tsdbCommitAddBlock(pCommith, pBlock, NULL, 0) < 0) return -1; + if (tsdbCommitAddBlock(pCommith, pBlock, NULL, 0) < 0) { + return -1; + } } else { block = *pBlock; - block.offset = sizeof(SBlock) * taosArrayGetSize(pCommith->aSupBlk); + block.offset = sizeof(SBlock) * taosArrayGetSize(pCommith->aSubBlk); if (tsdbCommitAddBlock(pCommith, &block, POINTER_SHIFT(pCommith->readh.pBlkInfo, pBlock->offset), pBlock->numOfSubBlocks) < 0) { @@ -1105,7 +1110,7 @@ static int tsdbMoveBlock(SCommitH *pCommith, int bidx) { } } else { if (tsdbLoadBlockData(&(pCommith->readh), pBlock, NULL) < 0) return -1; - if (tsdbWriteBlock(pCommith, pCommitF, pCommith->readh.pDCols[0], &block, pBlock->last, true) < 0) return -1; + if (tsdbWriteBlock(pCommith, pDFile, pCommith->readh.pDCols[0], &block, pBlock->last, true) < 0) return -1; if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) return -1; } @@ -1113,14 +1118,12 @@ static int tsdbMoveBlock(SCommitH *pCommith, int bidx) { } static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const SBlock *pSubBlocks, int nSubBlocks) { - ASSERT(pSupBlock != NULL); - if (taosArrayPush(pCommith->aSupBlk, pSupBlock) < 0) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } - if (pSubBlocks && taosArrayPushBatch(pCommith->aSupBlk, pSubBlocks, nSubBlocks) < 0) { + if (pSubBlocks && taosArrayPushBatch(pCommith->aSubBlk, pSubBlocks, nSubBlocks) < 0) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } -- GitLab