From ec2a069185934e9db7441092fd0c0a7b87a6333f Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 15 Jul 2022 06:53:56 +0000 Subject: [PATCH] more vnode snapshot writer --- source/dnode/vnode/src/tsdb/tsdbSnapshot.c | 222 ++++++++++++--------- source/dnode/vnode/src/tsdb/tsdbUtil.c | 6 +- 2 files changed, 135 insertions(+), 93 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 51514f1257..d9a67ce487 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -377,10 +377,9 @@ struct STsdbSnapWriter { SBlockIdx* pBlockIdx; SMapData mBlock; // SMapData int32_t iBlock; - SBlock* pBlock; - SBlock block; - SBlockData bDataR; + SBlockData* pBlockData; int32_t iRow; + SBlockData bDataR; SDataFWriter* pDataFWriter; SBlockIdx* pBlockIdxW; // NULL when no committing table @@ -550,127 +549,171 @@ static int32_t tsdbSnapWriteDataImpl(STsdbSnapWriter* pWriter, TABLEID id) { TSDBKEY keyFirst = tBlockDataFirstKey(pBlockData); TSDBKEY keyLast = tBlockDataLastKey(pBlockData); - if (pWriter->pDataFReader == NULL) { - // no old data + // TABLE ==================================== - // end last table write if need - if (pWriter->pBlockIdxW) { - int32_t c = tTABLEIDCmprFn(pWriter->pBlockIdxW, &id); + // end last table write if should + if (pWriter->pBlockIdxW) { + int32_t c = tTABLEIDCmprFn(pWriter->pBlockIdxW, &id); + if (c < 0) { + // end + code = tsdbSnapWriteTableDataEnd(pWriter); + if (code) goto _err; - if (c < 0) { - // end last table data write - code = tsdbSnapWriteTableDataEnd(pWriter); - if (code) goto _err; + // reset + pWriter->pBlockIdxW = NULL; + } else if (c > 0) { + ASSERT(0); + } + } - // reset - pWriter->pBlockIdxW = NULL; - } else if (c > 0) { - ASSERT(0); + // start new table data write if need + if (pWriter->pBlockIdxW == NULL) { + // write table data ahead + code = tsdbSnapWriteTableDataAhead(pWriter, id); + if (code) goto _err; + + // reader + if (pWriter->iBlockIdx < taosArrayGetSize(pWriter->aBlockIdx)) { + ASSERT(pWriter->pDataFReader); + + pWriter->pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlock); + int32_t c = tTABLEIDCmprFn(pWriter->pBlockIdx, &id); + if (c) { + ASSERT(c > 0); + pWriter->pBlockIdx = NULL; + } else { + pWriter->iBlockIdx++; } + } else { + pWriter->pBlockIdx = NULL; } - // start a new table data if need - if (pWriter->pBlockIdxW == NULL) { - pWriter->pBlockIdxW = &pWriter->blockIdxW; - pWriter->pBlockIdxW->suid = id.suid; - pWriter->pBlockIdxW->uid = id.uid; + if (pWriter->pBlockIdx) { + code = tsdbReadBlock(pWriter->pDataFReader, pWriter->pBlockIdx, &pWriter->mBlock, NULL); + if (code) goto _err; + } else { + tMapDataReset(&pWriter->mBlock); + } + pWriter->iBlock = 0; + pWriter->pBlockData = NULL; + pWriter->iRow = 0; - tBlockReset(&pWriter->blockW); + // writer + pWriter->pBlockIdxW = &pWriter->blockIdxW; + pWriter->pBlockIdxW->suid = id.suid; + pWriter->pBlockIdxW->uid = id.uid; - tBlockDataReset(&pWriter->bDataW); + tBlockReset(&pWriter->blockW); + tBlockDataReset(&pWriter->bDataW); + tMapDataReset(&pWriter->mBlockW); + } - tMapDataReset(&pWriter->mBlockW); - } + ASSERT(pWriter->pBlockIdxW && pWriter->pBlockIdxW->suid == id.suid && pWriter->pBlockIdxW->uid == id.uid); + ASSERT(pWriter->pBlockIdx == NULL || (pWriter->pBlockIdx->suid == id.suid && pWriter->pBlockIdx->uid == id.uid)); - // set block schema - code = tBlockDataCorrectSchema(&pWriter->bDataW, pBlockData); - if (code) goto _err; + // BLOCK ==================================== + int32_t iRow = 0; + TSDBROW* pRow = &tsdbRowFromBlockData(pBlockData, iRow); + while (true) { + if (pRow == NULL) break; - // add rows - for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) { - TSDBROW row = tsdbRowFromBlockData(pBlockData, iRow); + if (pWriter->pBlockData) { + ASSERT(pWriter->iRow < pWriter->pBlockData->nRow); - code = tBlockDataAppendRow(&pWriter->bDataW, &row, NULL); - if (code) goto _err; + int32_t c = tsdbRowCmprFn(pRow, &tsdbRowFromBlockData(pWriter->pBlockData, pWriter->iRow)); + + ASSERT(c); - if (pWriter->bDataW.nRow >= pWriter->maxRow * 4 / 5) { - pWriter->blockW.last = 0; - code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW, - &pWriter->blockW, pWriter->cmprAlg); + if (c < 0) { + code = tBlockDataAppendRow(&pWriter->bDataW, pRow, NULL); if (code) goto _err; - code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock); + iRow++; + if (iRow < pWriter->pBlockData->nRow) { + pRow = &tsdbRowFromBlockData(pBlockData, iRow); + } else { + pRow = NULL; + } + } else if (c > 0) { + code = tBlockDataAppendRow(&pWriter->bDataW, &tsdbRowFromBlockData(pWriter->pBlockData, pWriter->iRow), NULL); if (code) goto _err; - // reset - tBlockReset(&pWriter->blockW); - tBlockDataClearData(&pWriter->bDataW); + pWriter->iRow++; + if (pWriter->iRow >= pWriter->pBlockData->nRow) { + pWriter->pBlockData = NULL; + } } - } - } else { - // has old data + } else { + SBlock block; - // TABLE ================================================== - // end last table data if id not same (todo) - if (pWriter->pBlockIdxW) { - int32_t c = tTABLEIDCmprFn(pWriter->pBlockIdxW, &id); - if (c < 0) { - } else if (c > 0) { - ASSERT(0); - } - } + if (pWriter->iBlock < pWriter->mBlock.nItem) { + tMapDataGetItemByIdx(&pWriter->mBlock, pWriter->iBlock, &block, tGetBlock); + int32_t c; - // start new table data if need (todo) - if (pWriter->pBlockIdxW == NULL) { - // commit table data ahead - code = tsdbSnapWriteTableDataAhead(pWriter, id); - if (code) goto _err; + c = tsdbKeyCmprFn(&block.maxKey, &TSDBROW_KEY(pRow)); + + ASSERT(c); - // reader - if (pWriter->iBlockIdx < taosArrayGetSize(pWriter->aBlockIdx)) { - pWriter->pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx); - int32_t c = tTABLEIDCmprFn(pWriter->pBlockIdx, &id); - if (c) { - pWriter->pBlockIdx = NULL; + if (c < 0) { + code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock); + if (code) goto _err; + + pWriter->iBlock++; + continue; } - } else { - pWriter->pBlockIdx = NULL; - } - if (pWriter->pBlockIdx) { - code = tsdbReadBlock(pWriter->pDataFReader, pWriter->pBlockIdx, &pWriter->mBlockW, NULL); + c = tsdbKeyCmprFn(&block.minKey, &TSDBROW_KEY(pRow)); + + ASSERT(c); + + if (c > 0) { + code = tBlockDataAppendRow(&pWriter->bDataW, pRow, NULL); + if (code) goto _err; + + iRow++; + if (iRow < pWriter->pBlockData->nRow) { + pRow = &tsdbRowFromBlockData(pBlockData, iRow); + } else { + pRow = NULL; + } + + goto _check_write; + } + + pWriter->pBlockData = &pWriter->bDataR; + code = tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, pWriter->pBlockData, NULL, NULL); if (code) goto _err; - pWriter->iBlock = 0; + pWriter->iRow = 0; } - // writer - pWriter->pBlockIdxW = &pWriter->blockIdxW; - pWriter->pBlockIdxW->suid = id.suid; - pWriter->pBlockIdxW->uid = id.uid; + code = tBlockDataAppendRow(&pWriter->bDataW, pRow, NULL); + if (code) goto _err; - tBlockReset(&pWriter->blockW); - tBlockDataReset(&pWriter->bDataW); - tMapDataReset(&pWriter->mBlockW); + iRow++; + if (iRow < pWriter->pBlockData->nRow) { + pRow = &tsdbRowFromBlockData(pBlockData, iRow); + } else { + pRow = NULL; + } } - // BLOCK ================================================== - // write block ahead - while (true) { - if (pWriter->iBlock >= pWriter->mBlock.nItem) break; + _check_write: + if (pWriter->bDataW.nRow < pWriter->maxRow * 4 / 5) continue; - SBlock block; - tMapDataGetItemByIdx(&pWriter->mBlock, pWriter->iBlock, &block, tGetBlock); - - if (tsdbKeyCmprFn(&block.maxKey, &keyFirst) >= 0) break; + _write_block: + code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdx, &pWriter->blockW, + pWriter->cmprAlg); + if (code) goto _err; - pWriter->iBlock++; + code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock); + if (code) goto _err; - code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock); - if (code) goto _err; - } + tBlockReset(&pWriter->blockW); + tBlockDataClearData(&pWriter->bDataW); } +_exit: return code; _err: @@ -720,7 +763,6 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3 pWriter->pBlockIdx = NULL; tMapDataReset(&pWriter->mBlock); pWriter->iBlock = 0; - pWriter->pBlock = NULL; tBlockDataReset(&pWriter->bDataR); pWriter->iRow = 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 6320a48e77..04368ddfb7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -36,15 +36,15 @@ int32_t tMapDataPutItem(SMapData *pMapData, void *pItem, int32_t (*tPutItemFn)(u // alloc code = tRealloc((uint8_t **)&pMapData->aOffset, sizeof(int32_t) * pMapData->nItem); - if (code) goto _err; + if (code) goto _exit; code = tRealloc(&pMapData->pData, pMapData->nData); - if (code) goto _err; + if (code) goto _exit; // put pMapData->aOffset[nItem] = offset; tPutItemFn(pMapData->pData + offset, pItem); -_err: +_exit: return code; } -- GitLab