diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 29626ed67d033c3f3ab4b219b7603c8f4fcbe752..4c7dbd0a456881a0f244d3193bfcb5318d2cb6c6 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -140,9 +140,9 @@ void tBlockDataReset(SBlockData *pBlockData); int32_t tBlockDataSetSchema(SBlockData *pBlockData, STSchema *pTSchema, int64_t suid, int64_t uid); int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData **ppColData); int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid); +void tBlockDataClearData(SBlockData *pBlockData); int32_t tBlockDataCorrectSchema(SBlockData *pBlockData, SBlockData *pBlockDataFrom); -void tBlockDataClearData(SBlockData *pBlockData); int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlockData *pBlockData); int32_t tBlockDataCopy(SBlockData *pBlockDataSrc, SBlockData *pBlockDataDest); SColData *tBlockDataGetColDataByIdx(SBlockData *pBlockData, int32_t idx); diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 3f38b664c2fbc0a6fe8f20fe82e03693b54c13d8..e5c6ee44a0e8b6afcbf25d2a1c728087a929d201 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -299,7 +299,9 @@ static int32_t tsdbCommitterNextLastRow(SCommitter *pCommitter) { SBlockData *pBlockDatal = &pCommitter->dReader.bDatal; pCommitter->dReader.iRow++; if (pCommitter->dReader.iRow < pBlockDatal->nRow) { - if (pBlockDatal->uid == 0) { + if (pBlockDatal->uid) { + pCommitter->dReader.pRowInfo->uid = pBlockDatal->uid; + } else { pCommitter->dReader.pRowInfo->uid = pBlockDatal->aUid[pCommitter->dReader.iRow]; } pCommitter->dReader.pRowInfo->row = tsdbRowFromBlockData(pBlockDatal, pCommitter->dReader.iRow); @@ -327,6 +329,28 @@ _exit: return code; } +static int32_t tsdbCommitterNextTableData(SCommitter *pCommitter) { + int32_t code = 0; + + ASSERT(pCommitter->dReader.pBlockIdx); + + pCommitter->dReader.iBlockIdx++; + if (pCommitter->dReader.iBlockIdx < taosArrayGetSize(pCommitter->dReader.aBlockIdx)) { + pCommitter->dReader.pBlockIdx = + (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, pCommitter->dReader.iBlockIdx); + + code = tsdbReadBlock(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock, NULL); + if (code) goto _exit; + + ASSERT(pCommitter->dReader.mBlock.nItem > 0); + } else { + pCommitter->dReader.pBlockIdx = NULL; + } + +_exit: + return code; +} + static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { int32_t code = 0; STsdb *pTsdb = pCommitter->pTsdb; @@ -469,37 +493,18 @@ _exit: return code; } -static int32_t tsdbCommitBlockData(SCommitter *pCommitter, SBlockData *pBlockData, SBlock *pBlock, SBlockIdx *pBlockIdx, - int8_t toDataOnly) { - int32_t code = 0; - - if (pBlock->nSubBlock == 0) { - if (!toDataOnly && pBlockData->nRow < pCommitter->minRow) { - pBlock->last = 1; - } else { - pBlock->last = 0; - } - } - - code = - tsdbWriteBlockData(pCommitter->dWriter.pWriter, pBlockData, NULL, NULL, pBlockIdx, pBlock, pCommitter->cmprAlg); - if (code) goto _err; - - code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pBlock, tPutBlock); - if (code) goto _err; - - return code; - -_err: - return code; -} - -static int32_t tsdbCommitDataBlock(SCommitter *pCommitter) { +static int32_t tsdbCommitDataBlock(SCommitter *pCommitter, SBlock *pBlock) { int32_t code = 0; SBlock block; ASSERT(pCommitter->dWriter.bData.nRow > 0); + if (pBlock) { + block = *pBlock; // as a subblock + } else { + tBlockReset(&block); // as a new block + } + code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bData, &block, NULL, NULL, pCommitter->cmprAlg); if (code) goto _exit; @@ -556,14 +561,14 @@ static int32_t tsdbMergeCommitData(SCommitter *pCommitter, STbDataIter *pIter, S code = tsdbCommitterUpdateRowSchema(pCommitter, pTbData->suid, pTbData->uid, TSDBROW_SVERSION(pRow1)); if (code) goto _err; - code = tBlockDataAppendRow(pBlockDataW, pRow1, pCommitter->skmRow.pTSchema); + code = tBlockDataAppendRow(pBlockDataW, pRow1, pCommitter->skmRow.pTSchema, pTbData->uid); if (code) goto _err; // next tsdbTbDataIterNext(pIter); pRow1 = tsdbTbDataIterGet(pIter); } else if (c > 0) { - code = tBlockDataAppendRow(pBlockDataW, pRow2, NULL); + code = tBlockDataAppendRow(pBlockDataW, pRow2, NULL, pTbData->uid); if (code) goto _err; iRow++; @@ -578,13 +583,13 @@ static int32_t tsdbMergeCommitData(SCommitter *pCommitter, STbDataIter *pIter, S // check if (pBlockDataW->nRow >= pCommitter->maxRow * 4 / 5) { - code = tsdbCommitDataBlock(pCommitter); + code = tsdbCommitDataBlock(pCommitter, NULL); if (code) goto _err; } } while (pRow2) { - code = tBlockDataAppendRow(pBlockDataW, pRow2, NULL); + code = tBlockDataAppendRow(pBlockDataW, pRow2, NULL, pTbData->uid); if (code) goto _err; iRow++; @@ -596,14 +601,14 @@ static int32_t tsdbMergeCommitData(SCommitter *pCommitter, STbDataIter *pIter, S // check if (pBlockDataW->nRow >= pCommitter->maxRow * 4 / 5) { - code = tsdbCommitDataBlock(pCommitter); + code = tsdbCommitDataBlock(pCommitter, NULL); if (code) goto _err; } } // check if (pBlockDataW->nRow > 0) { - code = tsdbCommitDataBlock(pCommitter); + code = tsdbCommitDataBlock(pCommitter, NULL); if (code) goto _err; } @@ -620,7 +625,6 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter SBlockData *pBlockData = &pCommitter->dWriter.bData; tBlockDataClearData(pBlockData); - TSDBROW *pRow = tsdbTbDataIterGet(pIter); while (true) { if (pRow == NULL) { @@ -636,7 +640,7 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter if (code) goto _err; // append - code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->skmRow.pTSchema); + code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->skmRow.pTSchema, pTbData->uid); if (code) goto _err; tsdbTbDataIterNext(pIter); @@ -650,7 +654,7 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) { _write_block: - code = tsdbCommitDataBlock(pCommitter); + code = tsdbCommitDataBlock(pCommitter, NULL); if (code) goto _err; } } @@ -662,29 +666,6 @@ _err: return code; } -static int32_t tsdbCommitTableDiskData(SCommitter *pCommitter, SBlock *pBlock, SBlockIdx *pBlockIdx) { - int32_t code = 0; - SBlock block; - - if (pBlock->last) { - code = tsdbReadBlockData(pCommitter->dReader.pReader, pBlockIdx, pBlock, &pCommitter->dReader.bData, NULL, NULL); - if (code) goto _err; - - tBlockReset(&block); - code = tsdbCommitBlockData(pCommitter, &pCommitter->dReader.bData, &block, pBlockIdx, 0); - if (code) goto _err; - } else { - code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pBlock, tPutBlock); - if (code) goto _err; - } - - return code; - -_err: - tsdbError("vgId:%d, tsdb commit table disk data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); - return code; -} - static int32_t tsdbGetNumOfRowsLessThan(STbDataIter *pIter, TSDBKEY key) { int32_t nRow = 0; @@ -713,31 +694,29 @@ static int32_t tsdbMergeAsSubBlock(SCommitter *pCommitter, STbDataIter *pIter, S SBlockData *pBlockData = &pCommitter->dWriter.bData; tBlockDataClearData(pBlockData); + TSDBROW *pRow = tsdbTbDataIterGet(pIter); while (true) { - TSDBROW *pRow = tsdbTbDataIterGet(pIter); + if (pRow == NULL) break; + + code = tsdbCommitterUpdateRowSchema(pCommitter, pTbData->suid, pTbData->uid, TSDBROW_SVERSION(pRow)); + if (code) goto _err; + + code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->skmRow.pTSchema, pTbData->uid); + if (code) goto _err; + + tsdbTbDataIterNext(pIter); + pRow = tsdbTbDataIterGet(pIter); if (pRow) { TSDBKEY rowKey = TSDBROW_KEY(pRow); if (tsdbKeyCmprFn(&rowKey, &pBlock->maxKey) > 0) { pRow = NULL; } } - - if (pRow == NULL) { - break; - } - - code = tsdbCommitterUpdateRowSchema(pCommitter, pTbData->suid, pTbData->uid, TSDBROW_SVERSION(pRow)); - if (code) goto _err; - - code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->skmRow.pTSchema); - if (code) goto _err; } - SBlock block = *pBlock; - code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBlockData, &block, NULL, NULL, pCommitter->cmprAlg); - if (code) goto _err; + ASSERT(pBlockData->nRow > 0 && pBlock->nRow + pBlockData->nRow <= pCommitter->maxRow); - code = tMapDataPutItem(&pCommitter->dWriter.mBlock, &block, tPutBlock); + code = tsdbCommitDataBlock(pCommitter, pBlock); if (code) goto _err; return code; @@ -753,36 +732,51 @@ static int32_t tsdbMergeCommitLast(SCommitter *pCommitter, STbDataIter *pIter) { int32_t nRow = tsdbGetNumOfRowsLessThan(pIter, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}); if (pCommitter->dReader.pRowInfo) { - for (int32_t iRow = pCommitter->dReader.iRow; iRow < pCommitter->dReader.bDatal.nRow; iRow++) { - if (pTbData->uid != pCommitter->dReader.bDatal.aUid[iRow]) break; - nRow++; + if (pCommitter->dReader.pRowInfo->suid) { + for (int32_t iRow = pCommitter->dReader.iRow; iRow < pCommitter->dReader.bDatal.nRow; iRow++) { + if (pTbData->uid != pCommitter->dReader.bDatal.aUid[iRow]) break; + nRow++; + } + } else { + ASSERT(pCommitter->dReader.iRow == 0); + nRow += pCommitter->dReader.bDatal.nRow; } } if (nRow == 0) goto _exit; - SBlockData *pBlockData; - TSDBROW *pRow = tsdbTbDataIterGet(pIter); - SRowInfo *pRowInfo = pCommitter->dReader.pRowInfo; - + TSDBROW *pRow = tsdbTbDataIterGet(pIter); if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) { pRow = NULL; } - if (pRowInfo && (pRowInfo->suid != pTbData->suid || pRowInfo->uid != pTbData->uid)) { + SRowInfo *pRowInfo = pCommitter->dReader.pRowInfo; + if (pRowInfo && pRowInfo->uid != pTbData->uid) { pRowInfo = NULL; } while (nRow) { + SBlockData *pBlockData; + int8_t toData; + if (nRow < pCommitter->minRow) { // to .last + toData = 0; pBlockData = &pCommitter->dWriter.bDatal; - // check if same schema - if (pBlockData->suid != pTbData->suid || pBlockData->suid == 0) { - code = tsdbCommitLastBlock(pCommitter); - if (code) goto _err; + // commit and reset block data schema if need + if (pBlockData->nRow > 0) { + if (pBlockData->suid != pTbData->suid || pBlockData->suid == 0) { + code = tsdbCommitLastBlock(pCommitter); + if (code) goto _err; - code = tBlockDataSetSchema(pBlockData, pCommitter->skmTable.pTSchema); + tBlockDataReset(pBlockData); + } + } + + // set block data schema if need + if (pBlockData->suid == 0 && pBlockData->uid == 0) { + code = tBlockDataSetSchema(pBlockData, pCommitter->skmTable.pTSchema, pTbData->suid, + pTbData->suid ? 0 : pTbData->uid); if (code) goto _err; } @@ -791,7 +785,9 @@ static int32_t tsdbMergeCommitLast(SCommitter *pCommitter, STbDataIter *pIter) { if (code) goto _err; } } else { // to .data + toData = 1; pBlockData = &pCommitter->dWriter.bData; + ASSERT(pBlockData->nRow == 0); } while (pRow && pRowInfo) { @@ -800,7 +796,7 @@ static int32_t tsdbMergeCommitLast(SCommitter *pCommitter, STbDataIter *pIter) { code = tsdbCommitterUpdateRowSchema(pCommitter, pTbData->suid, pTbData->uid, TSDBROW_SVERSION(pRow)); if (code) goto _err; - code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->skmRow.pTSchema); + code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->skmRow.pTSchema, pTbData->uid); if (code) goto _err; tsdbTbDataIterNext(pIter); @@ -809,14 +805,14 @@ static int32_t tsdbMergeCommitLast(SCommitter *pCommitter, STbDataIter *pIter) { pRow = NULL; } } else if (c > 0) { - code = tBlockDataAppendRow(pBlockData, &pRowInfo->row, NULL); + code = tBlockDataAppendRow(pBlockData, &pRowInfo->row, NULL, pTbData->uid); if (code) goto _err; code = tsdbCommitterNextLastRow(pCommitter); if (code) goto _err; pRowInfo = pCommitter->dReader.pRowInfo; - if (pRowInfo && (pRowInfo->suid != pTbData->suid || pRowInfo->uid != pTbData->uid)) { + if (pRowInfo && pRowInfo->uid != pTbData->uid) { pRowInfo = NULL; } } else { @@ -824,16 +820,12 @@ static int32_t tsdbMergeCommitLast(SCommitter *pCommitter, STbDataIter *pIter) { } nRow--; - - if (pBlockData->uid) { // .data block - if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) { - code = tsdbCommitDataBlock(pCommitter); + if (toData) { + if (nRow == 0 || pBlockData->nRow >= pCommitter->maxRow * 4 / 5) { + code = tsdbCommitDataBlock(pCommitter, NULL); if (code) goto _err; - goto _outer_break; } - } else { - ASSERT(pBlockData->nRow <= pCommitter->maxRow); } } @@ -841,7 +833,7 @@ static int32_t tsdbMergeCommitLast(SCommitter *pCommitter, STbDataIter *pIter) { code = tsdbCommitterUpdateRowSchema(pCommitter, pTbData->suid, pTbData->uid, TSDBROW_SVERSION(pRow)); if (code) goto _err; - code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->skmRow.pTSchema); + code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->skmRow.pTSchema, pTbData->uid); if (code) goto _err; tsdbTbDataIterNext(pIter); @@ -851,40 +843,34 @@ static int32_t tsdbMergeCommitLast(SCommitter *pCommitter, STbDataIter *pIter) { } nRow--; - if (pBlockData->uid) { // .data block - if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) { - code = tsdbCommitDataBlock(pCommitter); + if (toData) { + if (nRow == 0 || pBlockData->nRow >= pCommitter->maxRow * 4 / 5) { + code = tsdbCommitDataBlock(pCommitter, NULL); if (code) goto _err; - goto _outer_break; } - } else { - ASSERT(pBlockData->nRow <= pCommitter->maxRow); } } while (pRowInfo) { - code = tBlockDataAppendRow(pBlockData, &pRowInfo->row, NULL); + code = tBlockDataAppendRow(pBlockData, &pRowInfo->row, NULL, pTbData->uid); if (code) goto _err; code = tsdbCommitterNextLastRow(pCommitter); if (code) goto _err; pRowInfo = pCommitter->dReader.pRowInfo; - if (pRowInfo && (pRowInfo->suid != pTbData->suid || pRowInfo->uid != pTbData->uid)) { + if (pRowInfo && pRowInfo->uid != pTbData->uid) { pRowInfo = NULL; } nRow--; - if (pBlockData->uid) { // .data block - if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) { - code = tsdbCommitDataBlock(pCommitter); + if (toData) { + if (nRow == 0 || pBlockData->nRow >= pCommitter->maxRow * 4 / 5) { + code = tsdbCommitDataBlock(pCommitter, NULL); if (code) goto _err; - goto _outer_break; } - } else { - ASSERT(pBlockData->nRow <= pCommitter->maxRow); } } @@ -922,8 +908,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) { int32_t iBlock = 0; SBlock block; SBlock *pBlock = █ - if (pCommitter->dReader.pBlockIdx && tTABLEIDCmprFn(pTbData, pCommitter->dReader.pBlockIdx) == 0 && - iBlock < pCommitter->dReader.mBlock.nItem) { + if (pCommitter->dReader.pBlockIdx && tTABLEIDCmprFn(pTbData, pCommitter->dReader.pBlockIdx) == 0) { tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock); } else { pBlock = NULL; @@ -933,8 +918,10 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) { if (code) goto _err; tMapDataReset(&pCommitter->dWriter.mBlock); - code = tBlockDataSetSchema(&pCommitter->dWriter.bData, pCommitter->skmTable.pTSchema); + code = tBlockDataSetSchema(&pCommitter->dWriter.bData, pCommitter->skmTable.pTSchema, pTbData->suid, pTbData->uid); if (code) goto _err; + + // .data merge while (pBlock && pRow) { int32_t c = tBlockCmprFn(pBlock, &(SBlock){.minKey = TSDBROW_KEY(pRow), .maxKey = TSDBROW_KEY(pRow)}); if (c < 0) { // disk @@ -997,25 +984,9 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) { } } - // merge with last + // .data append and .last merge code = tsdbMergeCommitLast(pCommitter, pIter); if (code) goto _err; -#if 0 - int32_t nRowLeft = tsdbGetNumOfRowsLessThan(pIter, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = - VERSION_MIN}); if (pCommitter->dReader.pRowInfo) { - for (int32_t iRow = pCommitter->dReader.iRow; iRow < pCommitter->dReader.bDatal.nRow; iRow++) { - int64_t uid = pCommitter->dReader.bDatal.aUid[iRow]; - if (uid == pTbData->uid) { - nRowLeft++; - } - } - } - - while (nRowLeft) { - code = tsdbMergeCommitLast(pCommitter, pIter, &nRowLeft); - if (code) goto _err; - } -#endif // end if (pCommitter->dWriter.mBlock.nItem > 0) { @@ -1081,7 +1052,7 @@ _err: static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) { int32_t code = 0; - // data + // .data while (true) { if (pCommitter->dReader.pBlockIdx == NULL || tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, &toTable) >= 0) break; @@ -1094,61 +1065,64 @@ static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) { goto _err; } - pCommitter->dReader.iBlockIdx++; - if (pCommitter->dReader.iBlockIdx < taosArrayGetSize(pCommitter->dReader.aBlockIdx)) { - pCommitter->dReader.pBlockIdx = - (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, pCommitter->dReader.iBlockIdx); - - code = - tsdbReadBlock(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock, NULL); - if (code) goto _err; - } else { - pCommitter->dReader.pBlockIdx = NULL; - } + code = tsdbCommitterNextTableData(pCommitter); + if (code) goto _err; } - // last + // .last while (true) { if (pCommitter->dReader.pRowInfo == NULL || tTABLEIDCmprFn(pCommitter->dReader.pRowInfo, &toTable) >= 0) break; - // commit if not same schema - if (pCommitter->dWriter.bDatal.nRow > 0) { - if (pCommitter->dWriter.bDatal.suid != pCommitter->dReader.pRowInfo->suid || - pCommitter->dWriter.bDatal.suid == 0) { + SBlockData *pBlockDataR = &pCommitter->dReader.bDatal; + SBlockData *pBlockDataW = &pCommitter->dWriter.bDatal; + tb_uid_t suid = pCommitter->dReader.pRowInfo->suid; + tb_uid_t uid = pCommitter->dReader.pRowInfo->uid; + + ASSERT((pBlockDataR->suid && !pBlockDataR->uid) || (!pBlockDataR->suid && pBlockDataR->uid)); + ASSERT(pBlockDataR->nRow > 0); + + // commit and reset block data schema if need + if (pBlockDataW->nRow > 0) { + if (pBlockDataW->suid != pCommitter->dReader.pRowInfo->suid || pBlockDataW->suid == 0) { code = tsdbCommitLastBlock(pCommitter); if (code) goto _err; + + tBlockDataReset(pBlockDataW); } } - if (pCommitter->dWriter.bDatal.nRow == 0) { - code = tsdbCommitterUpdateTableSchema(pCommitter, pCommitter->dReader.pRowInfo->suid, - pCommitter->dReader.pRowInfo->suid, 1 /*TODO*/); + // set block data schema if need + if (pBlockDataW->suid == 0 && pBlockDataW->uid == 0) { + code = tsdbCommitterUpdateTableSchema(pCommitter, suid, uid, 1 /*TOOD*/); if (code) goto _err; - pCommitter->dWriter.bDatal.suid = pCommitter->dReader.pRowInfo->suid; - code = tBlockDataSetSchema(&pCommitter->dWriter.bDatal, pCommitter->skmTable.pTSchema); + code = tBlockDataSetSchema(pBlockDataW, pCommitter->skmTable.pTSchema, suid, suid ? 0 : uid); if (code) goto _err; } // check if it can make sure that one table data in one block - int64_t uid = pCommitter->dReader.pRowInfo->uid; int32_t nRow = 0; - for (int32_t iRow = pCommitter->dReader.iRow; - (iRow < pCommitter->dReader.bDatal.nRow) && (pCommitter->dReader.bDatal.aUid[iRow] == uid); iRow++) { - nRow++; + if (pBlockDataR->suid) { + for (int32_t iRow = pCommitter->dReader.iRow; (iRow < pBlockDataR->nRow) && (pBlockDataR->aUid[iRow] == uid); + iRow++) { + nRow++; + } + } else { + ASSERT(pCommitter->dReader.iRow == 0); + nRow = pBlockDataR->nRow; } ASSERT(nRow > 0 && nRow < pCommitter->minRow); - if (pCommitter->dWriter.bDatal.nRow + nRow > pCommitter->maxRow) { - ASSERT(pCommitter->dWriter.bDatal.nRow > 0); + if (pBlockDataW->nRow + nRow > pCommitter->maxRow) { + ASSERT(pBlockDataW->nRow > 0); code = tsdbCommitLastBlock(pCommitter); if (code) goto _err; } while (nRow > 0) { - code = tBlockDataAppendRow(&pCommitter->dWriter.bDatal, &pCommitter->dReader.pRowInfo->row, NULL); + code = tBlockDataAppendRow(pBlockDataW, &pCommitter->dReader.pRowInfo->row, NULL, uid); if (code) goto _err; code = tsdbCommitterNextLastRow(pCommitter);