diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index d9a67ce4871d3267eb4759251f8ab7043cfc1bb6..932cf9f9ae9888c72e347f47b15d23d0eca9e788 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -488,52 +488,42 @@ _err: return code; } -static int32_t tsdbSnapWriteTableDataAhead(STsdbSnapWriter* pWriter, TABLEID id) { +static int32_t tsdbSnapMoveWriteTableData(STsdbSnapWriter* pWriter, SBlockIdx* pBlockIdx) { int32_t code = 0; - if (pWriter->pDataFReader == NULL) goto _exit; - - while (true) { - if (pWriter->iBlockIdx >= taosArrayGetSize(pWriter->aBlockIdx)) break; - - SBlockIdx* pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx); - int32_t c = tTABLEIDCmprFn(pBlockIdx, &id); - - if (c >= 0) break; - - pWriter->iBlockIdx++; - - code = tsdbReadBlock(pWriter->pDataFReader, pBlockIdx, &pWriter->mBlock, NULL); - if (code) goto _err; - - SBlock block; - tMapDataReset(&pWriter->mBlockW); - for (int32_t iBlock = 0; iBlock < pWriter->mBlock.nItem; iBlock++) { - tMapDataGetItemByIdx(&pWriter->mBlock, iBlock, &block, tGetBlock); + code = tsdbReadBlock(pWriter->pDataFReader, pBlockIdx, &pWriter->mBlock, NULL); + if (code) goto _err; - if (block.last) { - code = tsdbReadBlockData(pWriter->pDataFReader, pBlockIdx, &block, &pWriter->bDataR, NULL, NULL); - if (code) goto _err; + // SBlockData + SBlock block; + tMapDataReset(&pWriter->mBlockW); + for (int32_t iBlock = 0; iBlock < pWriter->mBlock.nItem; iBlock++) { + tMapDataGetItemByIdx(&pWriter->mBlock, iBlock, &block, tGetBlock); - tBlockReset(&block); - block.last = 1; - code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataR, NULL, NULL, pBlockIdx, &block, - pWriter->cmprAlg); - if (code) goto _err; - } + if (block.last) { + code = tsdbReadBlockData(pWriter->pDataFReader, pBlockIdx, &block, &pWriter->bDataR, NULL, NULL); + if (code) goto _err; - code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock); + tBlockReset(&block); + block.last = 1; + code = + tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataR, NULL, NULL, pBlockIdx, &block, pWriter->cmprAlg); if (code) goto _err; } - SBlockIdx blockIdx = {.suid = pBlockIdx->suid, .uid = pBlockIdx->uid}; - code = tsdbWriteBlock(pWriter->pDataFWriter, &pWriter->mBlockW, NULL, &blockIdx); + code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock); if (code) goto _err; + } - if (taosArrayPush(pWriter->aBlockIdxW, &blockIdx) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } + // SBlock + SBlockIdx blockIdx = {.suid = pBlockIdx->suid, .uid = pBlockIdx->uid}; + code = tsdbWriteBlock(pWriter->pDataFWriter, &pWriter->mBlockW, NULL, &blockIdx); + if (code) goto _err; + + // SBlockIdx + if (taosArrayPush(pWriter->aBlockIdxW, &blockIdx) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; } _exit: @@ -543,77 +533,19 @@ _err: return code; } -static int32_t tsdbSnapWriteDataImpl(STsdbSnapWriter* pWriter, TABLEID id) { +static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) { int32_t code = 0; SBlockData* pBlockData = &pWriter->bData; - TSDBKEY keyFirst = tBlockDataFirstKey(pBlockData); - TSDBKEY keyLast = tBlockDataLastKey(pBlockData); - - // TABLE ==================================== - - // end last table write if should - if (pWriter->pBlockIdxW) { - int32_t c = tTABLEIDCmprFn(pWriter->pBlockIdxW, &id); - if (c < 0) { - // end - code = tsdbSnapWriteTableDataEnd(pWriter); - if (code) goto _err; + int32_t iRow = 0; + TSDBROW row; + TSDBROW* pRow = &row; - // reset - pWriter->pBlockIdxW = NULL; - } else if (c > 0) { - ASSERT(0); - } - } - - // start new table data write if need - if (pWriter->pBlockIdxW == NULL) { - // write table data ahead - code = tsdbSnapWriteTableDataAhead(pWriter, id); - if (code) goto _err; - - // reader - if (pWriter->iBlockIdx < taosArrayGetSize(pWriter->aBlockIdx)) { - ASSERT(pWriter->pDataFReader); - - pWriter->pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlock); - int32_t c = tTABLEIDCmprFn(pWriter->pBlockIdx, &id); - if (c) { - ASSERT(c > 0); - pWriter->pBlockIdx = NULL; - } else { - pWriter->iBlockIdx++; - } - } else { - pWriter->pBlockIdx = NULL; - } - - if (pWriter->pBlockIdx) { - code = tsdbReadBlock(pWriter->pDataFReader, pWriter->pBlockIdx, &pWriter->mBlock, NULL); - if (code) goto _err; - } else { - tMapDataReset(&pWriter->mBlock); - } - pWriter->iBlock = 0; - pWriter->pBlockData = NULL; - pWriter->iRow = 0; - - // writer - pWriter->pBlockIdxW = &pWriter->blockIdxW; - pWriter->pBlockIdxW->suid = id.suid; - pWriter->pBlockIdxW->uid = id.uid; - - tBlockReset(&pWriter->blockW); - tBlockDataReset(&pWriter->bDataW); - tMapDataReset(&pWriter->mBlockW); - } - - ASSERT(pWriter->pBlockIdxW && pWriter->pBlockIdxW->suid == id.suid && pWriter->pBlockIdxW->uid == id.uid); - ASSERT(pWriter->pBlockIdx == NULL || (pWriter->pBlockIdx->suid == id.suid && pWriter->pBlockIdx->uid == id.uid)); + // correct schema + code = tBlockDataCorrectSchema(&pWriter->bDataW, pBlockData); + if (code) goto _err; - // BLOCK ==================================== - int32_t iRow = 0; - TSDBROW* pRow = &tsdbRowFromBlockData(pBlockData, iRow); + // loop to merge + *pRow = tsdbRowFromBlockData(pBlockData, iRow); while (true) { if (pRow == NULL) break; @@ -630,7 +562,7 @@ static int32_t tsdbSnapWriteDataImpl(STsdbSnapWriter* pWriter, TABLEID id) { iRow++; if (iRow < pWriter->pBlockData->nRow) { - pRow = &tsdbRowFromBlockData(pBlockData, iRow); + *pRow = tsdbRowFromBlockData(pBlockData, iRow); } else { pRow = NULL; } @@ -644,55 +576,60 @@ static int32_t tsdbSnapWriteDataImpl(STsdbSnapWriter* pWriter, TABLEID id) { } } } else { - SBlock block; + TSDBKEY key = TSDBROW_KEY(pRow); - if (pWriter->iBlock < pWriter->mBlock.nItem) { - tMapDataGetItemByIdx(&pWriter->mBlock, pWriter->iBlock, &block, tGetBlock); + while (true) { + if (pWriter->iBlock >= pWriter->mBlock.nItem) break; + + SBlock block; int32_t c; - c = tsdbKeyCmprFn(&block.maxKey, &TSDBROW_KEY(pRow)); + tMapDataGetItemByIdx(&pWriter->mBlock, pWriter->iBlock, &block, tGetBlock); - ASSERT(c); + if (block.last) { + pWriter->pBlockData = &pWriter->bDataR; - if (c < 0) { - code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock); + code = tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, pWriter->pBlockData, NULL, NULL); if (code) goto _err; + pWriter->iRow = 0; pWriter->iBlock++; - continue; + break; } - c = tsdbKeyCmprFn(&block.minKey, &TSDBROW_KEY(pRow)); + c = tsdbKeyCmprFn(&block.maxKey, &key); ASSERT(c); - if (c > 0) { - code = tBlockDataAppendRow(&pWriter->bDataW, pRow, NULL); + if (c > 0) break; + + if (pWriter->bDataW.nRow) { + pWriter->blockW.last = 0; + code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW, + &pWriter->blockW, pWriter->cmprAlg); if (code) goto _err; - iRow++; - if (iRow < pWriter->pBlockData->nRow) { - pRow = &tsdbRowFromBlockData(pBlockData, iRow); - } else { - pRow = NULL; - } + code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock); + if (code) goto _err; - goto _check_write; + tBlockReset(&pWriter->blockW); + tBlockDataClearData(&pWriter->bDataW); } - pWriter->pBlockData = &pWriter->bDataR; - code = tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, pWriter->pBlockData, NULL, NULL); + code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock); if (code) goto _err; - pWriter->iRow = 0; + pWriter->iBlock++; } + if (pWriter->pBlockData) continue; + code = tBlockDataAppendRow(&pWriter->bDataW, pRow, NULL); if (code) goto _err; iRow++; - if (iRow < pWriter->pBlockData->nRow) { - pRow = &tsdbRowFromBlockData(pBlockData, iRow); + if (iRow < pBlockData->nRow) { + *pRow = tsdbRowFromBlockData(pBlockData, iRow); } else { pRow = NULL; } @@ -713,6 +650,91 @@ static int32_t tsdbSnapWriteDataImpl(STsdbSnapWriter* pWriter, TABLEID id) { tBlockDataClearData(&pWriter->bDataW); } + return code; + +_err: + return code; +} + +static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, TABLEID id) { + int32_t code = 0; + SBlockData* pBlockData = &pWriter->bData; + TSDBKEY keyFirst = tBlockDataFirstKey(pBlockData); + TSDBKEY keyLast = tBlockDataLastKey(pBlockData); + + // end last table write if should + if (pWriter->pBlockIdxW) { + int32_t c = tTABLEIDCmprFn(pWriter->pBlockIdxW, &id); + if (c < 0) { + // end + code = tsdbSnapWriteTableDataEnd(pWriter); + if (code) goto _err; + + // reset + pWriter->pBlockIdxW = NULL; + } else if (c > 0) { + ASSERT(0); + } + } + + // start new table data write if need + if (pWriter->pBlockIdxW == NULL) { + // write table data ahead + while (true) { + if (pWriter->iBlockIdx >= taosArrayGetSize(pWriter->aBlockIdx)) break; + + SBlockIdx* pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx); + int32_t c = tTABLEIDCmprFn(pBlockIdx, &id); + + if (c >= 0) break; + + code = tsdbSnapMoveWriteTableData(pWriter, pBlockIdx); + if (code) goto _err; + + pWriter->iBlockIdx++; + } + + // reader + if (pWriter->iBlockIdx < taosArrayGetSize(pWriter->aBlockIdx)) { + ASSERT(pWriter->pDataFReader); + + SBlockIdx* pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlock); + int32_t c = tTABLEIDCmprFn(pBlockIdx, &id); + + ASSERT(c >= 0); + + if (c == 0) { + pWriter->pBlockIdx = pBlockIdx; + pWriter->iBlockIdx++; + } + } + + if (pWriter->pBlockIdx) { + code = tsdbReadBlock(pWriter->pDataFReader, pWriter->pBlockIdx, &pWriter->mBlock, NULL); + if (code) goto _err; + } else { + tMapDataReset(&pWriter->mBlock); + } + pWriter->iBlock = 0; + pWriter->pBlockData = NULL; + pWriter->iRow = 0; + + // writer + pWriter->pBlockIdxW = &pWriter->blockIdxW; + pWriter->pBlockIdxW->suid = id.suid; + pWriter->pBlockIdxW->uid = id.uid; + + tBlockReset(&pWriter->blockW); + tBlockDataReset(&pWriter->bDataW); + tMapDataReset(&pWriter->mBlockW); + } + + ASSERT(pWriter->pBlockIdxW && pWriter->pBlockIdxW->suid == id.suid && pWriter->pBlockIdxW->uid == id.uid); + ASSERT(pWriter->pBlockIdx == NULL || (pWriter->pBlockIdx->suid == id.suid && pWriter->pBlockIdx->uid == id.uid)); + + code = tsdbSnapWriteTableDataImpl(pWriter); + if (code) goto _err; + _exit: return code; @@ -723,11 +745,10 @@ _err: } static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { - int32_t code = 0; - STsdb* pTsdb = pWriter->pTsdb; - SSnapDataHdr* pHdr = (SSnapDataHdr*)pData; - TABLEID id = *(TABLEID*)(&pHdr[1]); - int64_t n; + int32_t code = 0; + STsdb* pTsdb = pWriter->pTsdb; + TABLEID id = *(TABLEID*)(pData + sizeof(SSnapDataHdr)); + int64_t n; // decode SBlockData* pBlockData = &pWriter->bData; @@ -742,13 +763,13 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3 ASSERT(fid == tsdbKeyFid(keyLast.ts, pWriter->minutes, pWriter->precision)); if (pWriter->pDataFWriter == NULL || pWriter->fid != fid) { // end last file data write if need - code = tsdbSnapWriteDataEnd(pWriter); // todo + code = tsdbSnapWriteDataEnd(pWriter); if (code) goto _err; pWriter->fid = fid; // read - SDFileSet* pSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, fid, TD_EQ); // todo: check nState is valid + SDFileSet* pSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, fid, TD_EQ); if (pSet) { code = tsdbDataFReaderOpen(&pWriter->pDataFReader, pTsdb, pSet); if (code) goto _err; @@ -763,8 +784,9 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3 pWriter->pBlockIdx = NULL; tMapDataReset(&pWriter->mBlock); pWriter->iBlock = 0; - tBlockDataReset(&pWriter->bDataR); + pWriter->pBlockData = NULL; pWriter->iRow = 0; + tBlockDataReset(&pWriter->bDataR); // write SDFileSet wSet; @@ -789,12 +811,12 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3 if (code) goto _err; taosArrayClear(pWriter->aBlockIdxW); - pWriter->pBlockIdxW = NULL; tMapDataReset(&pWriter->mBlockW); + pWriter->pBlockIdxW = NULL; tBlockDataReset(&pWriter->bDataW); } - code = tsdbSnapWriteDataImpl(pWriter, id); + code = tsdbSnapWriteTableData(pWriter, id); if (code) goto _err; tsdbInfo("vgId:%d vnode snapshot tsdb write data, fid:%d suid:%" PRId64 " uid:%" PRId64 " nRow:%d",