diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 59e5d360641de11741dd348fdd9632fd7011c46f..bb28c41e93af548581b3fc063f120cfa4b3ef08e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -559,18 +559,14 @@ _err: return code; } -static int32_t tsdbCommitDataBlock(SCommitter *pCommitter, SBlock *pBlock) { +static int32_t tsdbCommitDataBlock(SCommitter *pCommitter) { int32_t code = 0; SBlockData *pBlockData = &pCommitter->dWriter.bData; SBlock block; ASSERT(pBlockData->nRow > 0); - if (pBlock) { - block = *pBlock; // as a subblock - } else { - tBlockReset(&block); // as a new block - } + tBlockReset(&block); // info block.nRow += pBlockData->nRow; @@ -1547,14 +1543,14 @@ static int32_t tsdbCommitAheadBlock(SCommitter *pCommitter, SBlock *pBlock) { } } - if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) { - code = tsdbCommitDataBlock(pCommitter, NULL); + if (pBlockData->nRow >= pCommitter->maxRow) { + code = tsdbCommitDataBlock(pCommitter); if (code) goto _err; } } if (pBlockData->nRow) { - code = tsdbCommitDataBlock(pCommitter, NULL); + code = tsdbCommitDataBlock(pCommitter); if (code) goto _err; } @@ -1616,8 +1612,8 @@ static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SBlock *pBlock) { ASSERT(0); } - if (pBDataW->nRow >= pCommitter->maxRow * 4 / 5) { - code = tsdbCommitDataBlock(pCommitter, NULL); + if (pBDataW->nRow >= pCommitter->maxRow) { + code = tsdbCommitDataBlock(pCommitter); if (code) goto _err; } } @@ -1633,14 +1629,14 @@ static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SBlock *pBlock) { pRow = NULL; } - if (pBDataW->nRow >= pCommitter->maxRow * 4 / 5) { - code = tsdbCommitDataBlock(pCommitter, NULL); + if (pBDataW->nRow >= pCommitter->maxRow) { + code = tsdbCommitDataBlock(pCommitter); if (code) goto _err; } } if (pBDataW->nRow) { - code = tsdbCommitDataBlock(pCommitter, NULL); + code = tsdbCommitDataBlock(pCommitter); if (code) goto _err; } @@ -1724,59 +1720,146 @@ _err: return code; } +static int32_t tsdbAppendLastBlock(SCommitter *pCommitter) { + int32_t code = 0; + + SBlockData *pBData = &pCommitter->dWriter.bData; + SBlockData *pBDatal = &pCommitter->dWriter.bDatal; + + if (pBDatal->suid || pBDatal->uid) { + if (pBDatal->suid != pBData->suid || pBDatal->suid == 0) { + if (pBDatal->nRow) { + code = tsdbCommitLastBlock(pCommitter); + if (code) goto _err; + } + tBlockDataReset(pBDatal); + } + } + + if (!pBDatal->suid && !pBDatal->uid) { + ASSERT(pCommitter->skmTable.suid == pBData->suid); + ASSERT(pCommitter->skmTable.uid == pBData->uid); + code = tBlockDataInit(pBDatal, pBData->suid, 0, pCommitter->skmTable.pTSchema); + if (code) goto _err; + } + + for (int32_t iRow = 0; iRow < pBData->nRow; iRow++) { + TSDBROW row = tsdbRowFromBlockData(pBData, iRow); + code = tBlockDataAppendRow(pBDatal, &row, NULL, pBData->uid); + if (code) goto _err; + + if (pBDatal->nRow >= pCommitter->maxRow) { + code = tsdbCommitLastBlock(pCommitter); + if (code) goto _err; + } + } + + return code; + +_err: + return code; +} + static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) { - int32_t code = 0; - SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter); + int32_t code = 0; + SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter); if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) { pRowInfo = NULL; } - if (pRowInfo == NULL) goto _err; + if (pRowInfo == NULL) goto _exit; -#if 0 - if (pBlockData->suid || pBlockData->uid) { - if (pBlockData->suid != pTbData->suid || pBlockData->suid == 0) { - if (pBlockData->nRow > 0) { - code = tsdbCommitLastBlock(pCommitter); - if (code) goto _err; + if (pCommitter->toLastOnly) { + SBlockData *pBDatal = &pCommitter->dWriter.bDatal; + + if (pBDatal->suid || pBDatal->uid) { + if (pBDatal->suid != id.suid || pBDatal->suid == 0) { + if (pBDatal->nRow) { + code = tsdbCommitLastBlock(pCommitter); + if (code) goto _err; + } + tBlockDataReset(pBDatal); } + } - tBlockDataReset(pBlockData); + if (!pBDatal->suid && !pBDatal->uid) { + ASSERT(pCommitter->skmTable.suid == id.suid); + ASSERT(pCommitter->skmTable.uid == id.uid); + code = tBlockDataInit(pBDatal, id.suid, 0, pCommitter->skmTable.pTSchema); + if (code) goto _err; } - } - if (!pBlockData->suid && !pBlockData->uid) { - code = tBlockDataInit(pBlockData, pTbData->suid, 0, pCommitter->skmTable.pTSchema); - if (code) goto _err; - } -#endif + while (pRowInfo) { + STSchema *pTSchema = NULL; + if (pRowInfo->row.type == 0) { + code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row)); + if (code) goto _err; + pTSchema = pCommitter->skmRow.pTSchema; + } - SBlockData *pBlockData = NULL; // TODO - while (pRowInfo) { - STSchema *pTSchema = NULL; - if (pRowInfo->row.type == 0) { - code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row)); + code = tBlockDataAppendRow(pBDatal, &pRowInfo->row, pTSchema, id.uid); + if (code) goto _err; + + code = tsdbNextCommitRow(pCommitter); if (code) goto _err; - pTSchema = pCommitter->skmRow.pTSchema; + pRowInfo = tsdbGetCommitRow(pCommitter); + if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) { + pRowInfo = NULL; + } + + if (pBDatal->nRow >= pCommitter->maxRow) { + code = tsdbCommitLastBlock(pCommitter); + if (code) goto _err; + } } + } else { + SBlockData *pBData = &pCommitter->dWriter.bData; - code = tBlockDataAppendRow(pBlockData, &pRowInfo->row, pTSchema, id.uid); - if (code) goto _err; + ASSERT(pBData->nRow == 0); - code = tsdbNextCommitRow(pCommitter); - if (code) goto _err; + while (pRowInfo) { + STSchema *pTSchema = NULL; + if (pRowInfo->row.type == 0) { + code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row)); + if (code) goto _err; + pTSchema = pCommitter->skmRow.pTSchema; + } - pRowInfo = tsdbGetCommitRow(pCommitter); - if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) { - pRowInfo = NULL; + code = tBlockDataAppendRow(pBData, &pRowInfo->row, pTSchema, id.uid); + if (code) goto _err; + + code = tsdbNextCommitRow(pCommitter); + if (code) goto _err; + + pRowInfo = tsdbGetCommitRow(pCommitter); + if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) { + pRowInfo = NULL; + } + + if (pBData->nRow >= pCommitter->maxRow) { + code = tsdbCommitDataBlock(pCommitter); + if (code) goto _err; + } + } + + if (pBData->nRow) { + if (pBData->nRow > pCommitter->minRow) { + code = tsdbCommitDataBlock(pCommitter); + if (code) goto _err; + } else { + code = tsdbAppendLastBlock(pCommitter); + if (code) goto _err; + } } } +_exit: return code; _err: + tsdbError("vgId:%d tsdb commit table data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); return code; } @@ -1830,7 +1913,6 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { code = tsdbMoveCommitData(pCommitter, id); if (code) goto _err; - // TODO: here may have problem if (pCommitter->dWriter.bDatal.nRow > 0) { code = tsdbCommitLastBlock(pCommitter); if (code) goto _err;