From 447217670fa38b83c105612dd96113da3e71245c Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 15 Jan 2021 11:35:02 +0000 Subject: [PATCH] more refact --- src/tsdb/src/tsdbCommit.c | 175 +++++++++----------------------------- 1 file changed, 40 insertions(+), 135 deletions(-) diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index cad88871a8..8300cfbdca 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -604,182 +604,90 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid) { SCommitIter *pIter = pCommith->iters + tid; TSKEY nextKey = tsdbNextIterKey(pIter->pIter); + tsdbResetCommitTable(pCommith); + TSDB_RLOCK_TABLE(pIter->pTable); // Set commit table - tsdbResetCommitTable(pCommith); if (tsdbSetCommitTable(pCommith, pIter->pTable) < 0) { TSDB_RUNLOCK_TABLE(pIter->pTable); return -1; } + // No disk data and no memory data, just return if (pCommith->readh.pBlkIdx == NULL && (nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey)) { - // No disk data and no memory data TSDB_RUNLOCK_TABLE(pIter->pTable); return 0; - } else { - // Must has disk data, maybe has memory data - int nBlocks; - int bidx = 0; - SBlock *pBlock; - - if (pCommith->readh.pBlkIdx) { - if (tsdbLoadBlockInfo(&(pCommith->readh), NULL) < 0) { - TSDB_RUNLOCK_TABLE(pIter->pTable); - return -1; - } - - nBlocks = pCommith->readh.pBlkIdx->numOfBlocks; - } else { - nBlocks = 0; - } - - if (bidx < nBlocks) { - pBlock = pCommith->readh.pBlkInfo->blocks + bidx; - } else { - pBlock = NULL; - } - - while (true) { - if (pBlock == NULL && (nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey)) break; - - if ((nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey) || - (pBlock && (!pBlock->last) && tsdbComparKeyBlock((void *)(&nextKey), pBlock) > 0)) { - if (tsdbMoveBlock(pCommith, bidx) < 0) { - TSDB_RUNLOCK_TABLE(pIter->pTable); - return -1; - } - - bidx++; - if (bidx < nBlocks) { - pBlock = pCommith->readh.pBlkInfo->blocks + bidx; - } else { - pBlock = NULL; - } - } else if (pBlock && (pBlock->last || tsdbComparKeyBlock((void *)(&nextKey), pBlock) == 0)) { - // merge pBlock data and memory data - if (tsdbMergeMemData(pCommith, pIter, bidx) < 0) { - TSDB_RUNLOCK_TABLE(pIter->pTable); - return -1; - } - - bidx++; - if (bidx < nBlocks) { - pBlock = pCommith->readh.pBlkInfo->blocks + bidx; - } else { - pBlock = NULL; - } - nextKey = tsdbNextIterKey(pIter->pIter); - } else { - // Only commit memory data - if (pBlock == NULL) { - if (tsdbCommitMemData(pCommith, pIter, pCommith->maxKey, false) < 0) { - TSDB_RUNLOCK_TABLE(pIter->pTable); - return -1; - } - } else { - if (tsdbCommitMemData(pCommith, pIter, pBlock->keyFirst - 1, true) < 0) { - TSDB_RUNLOCK_TABLE(pIter->pTable); - return -1; - } - } - nextKey = tsdbNextIterKey(pIter->pIter); - } - } } -#if 0 - if (!pCommith->isRFileSet) { - if (pIter->pIter == NULL) { - // No memory data - TSDB_RUNLOCK_TABLE(pIter->pTable); - return 0; - } else { - // TODO: think about no data committed at all - if (tsdbCommitMemData(pCommith, pIter, pCommith->maxKey, true) < 0) { - TSDB_RUNLOCK_TABLE(pIter->pTable); - return -1; - } + // Must has disk data or has memory data + int nBlocks; + int bidx = 0; + SBlock *pBlock; + if (pCommith->readh.pBlkIdx) { + if (tsdbLoadBlockInfo(&(pCommith->readh), NULL) < 0) { TSDB_RUNLOCK_TABLE(pIter->pTable); - if (tsdbWriteBlockInfo(pCommith) < 0) { - return -1; - } - - return 0; + return -1; } - } - // No memory data and no disk data, just return - if (pIter->pIter == NULL && pCommith->readh.pBlkIdx == NULL) { - TSDB_RUNLOCK_TABLE(pIter->pTable); - return 0; - } - - if (tsdbLoadBlockInfo(&(pCommith->readh), NULL) < 0) { - TSDB_RUNLOCK_TABLE(pIter->pTable); - return -1; + nBlocks = pCommith->readh.pBlkIdx->numOfBlocks; + } else { + nBlocks = 0; } - // Process merge commit - int nBlocks = (pCommith->readh.pBlkIdx == NULL) ? 0 : pCommith->readh.pBlkIdx->numOfBlocks; - TSKEY nextKey = tsdbNextIterKey(pIter->pIter); - int cidx = 0; - void * ptr = NULL; - SBlock *pBlock; - - if (cidx < nBlocks) { - pBlock = pCommith->readh.pBlkInfo->blocks + cidx; + if (bidx < nBlocks) { + pBlock = pCommith->readh.pBlkInfo->blocks + bidx; } else { pBlock = NULL; } while (true) { - if ((nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey) && (pBlock == NULL)) break; + if (pBlock == NULL && (nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey)) break; if ((nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey) || (pBlock && (!pBlock->last) && tsdbComparKeyBlock((void *)(&nextKey), pBlock) > 0)) { - if (tsdbMoveBlock(pCommith, cidx) < 0) { + if (tsdbMoveBlock(pCommith, bidx) < 0) { TSDB_RUNLOCK_TABLE(pIter->pTable); return -1; } - cidx++; - if (cidx < nBlocks) { - pBlock = pCommith->readh.pBlkInfo->blocks + cidx; + bidx++; + if (bidx < nBlocks) { + pBlock = pCommith->readh.pBlkInfo->blocks + bidx; } else { pBlock = NULL; } - } else if ((cidx < nBlocks) && (pBlock->last || tsdbComparKeyBlock((void *)(&nextKey), pBlock) == 0)) { - if (tsdbMergeMemData(pCommith, pIter, cidx) < 0) { + } else if (pBlock && (pBlock->last || tsdbComparKeyBlock((void *)(&nextKey), pBlock) == 0)) { + // merge pBlock data and memory data + if (tsdbMergeMemData(pCommith, pIter, bidx) < 0) { TSDB_RUNLOCK_TABLE(pIter->pTable); return -1; } - cidx++; - if (cidx < nBlocks) { - pBlock = pCommith->readh.pBlkInfo->blocks + cidx; + bidx++; + if (bidx < nBlocks) { + pBlock = pCommith->readh.pBlkInfo->blocks + bidx; } else { pBlock = NULL; } nextKey = tsdbNextIterKey(pIter->pIter); } else { + // Only commit memory data if (pBlock == NULL) { if (tsdbCommitMemData(pCommith, pIter, pCommith->maxKey, false) < 0) { TSDB_RUNLOCK_TABLE(pIter->pTable); return -1; } - nextKey = tsdbNextIterKey(pIter->pIter); } else { if (tsdbCommitMemData(pCommith, pIter, pBlock->keyFirst - 1, true) < 0) { TSDB_RUNLOCK_TABLE(pIter->pTable); return -1; } - nextKey = tsdbNextIterKey(pIter->pIter); } + nextKey = tsdbNextIterKey(pIter->pIter); } } -#endif TSDB_RUNLOCK_TABLE(pIter->pTable); @@ -961,8 +869,7 @@ static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCo } static int tsdbWriteBlockInfo(SCommitH *pCommih) { - SDFile *pHeadf = TSDB_COMMIT_HEAD_FILE(pCommih); - + SDFile * pHeadf = TSDB_COMMIT_HEAD_FILE(pCommih); SBlockIdx blkIdx; STable * pTable = TSDB_COMMIT_TABLE(pCommih); SBlock * pBlock; @@ -974,9 +881,13 @@ static int tsdbWriteBlockInfo(SCommitH *pCommih) { nSupBlocks = taosArrayGetSize(pCommih->aSupBlk); nSubBlocks = taosArrayGetSize(pCommih->aSubBlk); - tlen = sizeof(SBlockInfo) + sizeof(SBlock) * (nSupBlocks + nSubBlocks) + sizeof(TSCKSUM); - ASSERT(nSupBlocks > 0); + if (nSupBlocks <= 0) { + // No data (data all deleted) + return 0; + } + + tlen = sizeof(SBlockInfo) + sizeof(SBlock) * (nSupBlocks + nSubBlocks) + sizeof(TSCKSUM); // Write SBlockInfo part if (tsdbMakeRoom((void **)(&(TSDB_COMMIT_BUF(pCommih))), tlen) < 0) return -1; @@ -1001,16 +912,7 @@ static int tsdbWriteBlockInfo(SCommitH *pCommih) { taosCalcChecksumAppend(0, (uint8_t *)pBlkInfo, tlen); - offset = tsdbSeekDFile(pHeadf, 0, SEEK_END); - if (offset < 0) { - tsdbError("vgId:%d failed to write block info part to file %s while seek since %s", TSDB_COMMIT_REPO_ID(pCommih), - TSDB_FILE_FULL_NAME(pHeadf), tstrerror(terrno)); - return -1; - } - - if (tsdbWriteDFile(pHeadf, TSDB_COMMIT_BUF(pCommih), tlen) < tlen) { - tsdbError("vgId:%d failed to write block info part to file %s since %s", TSDB_COMMIT_REPO_ID(pCommih), - TSDB_FILE_FULL_NAME(pHeadf), tstrerror(terrno)); + if (tsdbAppendDFile(pHeadf, TSDB_COMMIT_BUF(pCommih), tlen, &offset) < 0) { return -1; } @@ -1044,7 +946,10 @@ static int tsdbWriteBlockIdx(SCommitH *pCommih) { int tlen = 0, size; int64_t offset; - ASSERT(nidx > 0); + if (nidx <= 0) { + // All data are deleted + return 0; + } for (size_t i = 0; i < nidx; i++) { pBlkIdx = (SBlockIdx *)taosArrayGet(pCommih->aBlkIdx, i); -- GitLab