diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 108a138d34a598570a59739ce67bec192d581340..2395a4cd8b4185b98d96c31da79d0e244ae32007 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -53,7 +53,6 @@ typedef struct { // last SArray *aBlockL; // SArray int32_t iBlockL; - SBlockL *pBlockL; SBlockData bDatal; int32_t iRow; SRowInfo *pRowInfo; @@ -290,6 +289,54 @@ _err: return code; } +static int32_t tsdbCommitterUpdateTableSchema(SCommitter *pCommitter, int64_t suid, int64_t uid, int32_t sver) { + int32_t code = 0; + + if (pCommitter->skmTable.pTSchema) { + if (pCommitter->skmTable.suid == suid) { + if (suid == 0) { + if (pCommitter->skmTable.uid == uid && sver == pCommitter->skmTable.pTSchema->version) goto _exit; + } else { + if (sver == pCommitter->skmTable.pTSchema->version) goto _exit; + } + } + } + + pCommitter->skmTable.suid = suid; + pCommitter->skmTable.uid = uid; + tTSchemaDestroy(pCommitter->skmTable.pTSchema); + code = metaGetTbTSchemaEx(pCommitter->pTsdb->pVnode->pMeta, suid, uid, sver, &pCommitter->skmTable.pTSchema); + if (code) goto _exit; + +_exit: + return code; +} + +static int32_t tsdbCommitterUpdateRowSchema(SCommitter *pCommitter, int64_t suid, int64_t uid, int32_t sver) { + int32_t code = 0; + + if (pCommitter->skmRow.pTSchema) { + if (pCommitter->skmRow.suid == suid) { + if (suid == 0) { + if (pCommitter->skmRow.uid == uid && sver == pCommitter->skmRow.pTSchema->version) goto _exit; + } else { + if (sver == pCommitter->skmRow.pTSchema->version) goto _exit; + } + } + } + + pCommitter->skmRow.suid = suid; + pCommitter->skmRow.uid = uid; + tTSchemaDestroy(pCommitter->skmRow.pTSchema); + code = metaGetTbTSchemaEx(pCommitter->pTsdb->pVnode->pMeta, suid, uid, sver, &pCommitter->skmRow.pTSchema); + if (code) { + goto _exit; + } + +_exit: + return code; +} + static int32_t tsdbCommitterNextLastRow(SCommitter *pCommitter) { int32_t code = 0; @@ -308,8 +355,17 @@ static int32_t tsdbCommitterNextLastRow(SCommitter *pCommitter) { } else { pCommitter->dReader.iBlockL++; if (pCommitter->dReader.iBlockL < taosArrayGetSize(pCommitter->dReader.aBlockL)) { - pCommitter->dReader.pBlockL = (SBlockL *)taosArrayGet(pCommitter->dReader.aBlockL, pCommitter->dReader.iBlockL); - code = tsdbReadLastBlock(pCommitter->dReader.pReader, pCommitter->dReader.pBlockL, pBlockDatal); + SBlockL *pBlockL = (SBlockL *)taosArrayGet(pCommitter->dReader.aBlockL, pCommitter->dReader.iBlockL); + int64_t suid = pBlockL->suid; + int64_t uid = pBlockL->maxUid; + + code = tsdbCommitterUpdateTableSchema(pCommitter, suid, uid, 1 /*TODO*/); + if (code) goto _exit; + + code = tBlockDataInit(pBlockDatal, suid, suid ? 0 : uid, pCommitter->skmTable.pTSchema); + if (code) goto _exit; + + code = tsdbReadLastBlock(pCommitter->dReader.pReader, pBlockL, pBlockDatal); if (code) goto _exit; pCommitter->dReader.iRow = 0; @@ -444,54 +500,6 @@ _err: return code; } -static int32_t tsdbCommitterUpdateTableSchema(SCommitter *pCommitter, int64_t suid, int64_t uid, int32_t sver) { - int32_t code = 0; - - if (pCommitter->skmTable.pTSchema) { - if (pCommitter->skmTable.suid == suid) { - if (suid == 0) { - if (pCommitter->skmTable.uid == uid && sver == pCommitter->skmTable.pTSchema->version) goto _exit; - } else { - if (sver == pCommitter->skmTable.pTSchema->version) goto _exit; - } - } - } - - pCommitter->skmTable.suid = suid; - pCommitter->skmTable.uid = uid; - tTSchemaDestroy(pCommitter->skmTable.pTSchema); - code = metaGetTbTSchemaEx(pCommitter->pTsdb->pVnode->pMeta, suid, uid, sver, &pCommitter->skmTable.pTSchema); - if (code) goto _exit; - -_exit: - return code; -} - -static int32_t tsdbCommitterUpdateRowSchema(SCommitter *pCommitter, int64_t suid, int64_t uid, int32_t sver) { - int32_t code = 0; - - if (pCommitter->skmRow.pTSchema) { - if (pCommitter->skmRow.suid == suid) { - if (suid == 0) { - if (pCommitter->skmRow.uid == uid && sver == pCommitter->skmRow.pTSchema->version) goto _exit; - } else { - if (sver == pCommitter->skmRow.pTSchema->version) goto _exit; - } - } - } - - pCommitter->skmRow.suid = suid; - pCommitter->skmRow.uid = uid; - tTSchemaDestroy(pCommitter->skmRow.pTSchema); - code = metaGetTbTSchemaEx(pCommitter->pTsdb->pVnode->pMeta, suid, uid, sver, &pCommitter->skmRow.pTSchema); - if (code) { - goto _exit; - } - -_exit: - return code; -} - static int32_t tsdbCommitDataBlock(SCommitter *pCommitter, SBlock *pBlock) { int32_t code = 0; SBlockData *pBlockData = &pCommitter->dWriter.bData; @@ -779,13 +787,13 @@ static int32_t tsdbMergeCommitLast(SCommitter *pCommitter, STbDataIter *pIter) { STbData *pTbData = pIter->pTbData; int32_t nRow = tsdbGetNumOfRowsLessThan(pIter, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}); - if (pCommitter->dReader.pRowInfo) { - if (pCommitter->dReader.pRowInfo->suid) { + if (pCommitter->dReader.pRowInfo && tTABLEIDCmprFn(pTbData, pCommitter->dReader.pRowInfo) == 0) { + if (pCommitter->dReader.pRowInfo->suid) { // super table for (int32_t iRow = pCommitter->dReader.iRow; iRow < pCommitter->dReader.bDatal.nRow; iRow++) { if (pTbData->uid != pCommitter->dReader.bDatal.aUid[iRow]) break; nRow++; } - } else { + } else { // normal table ASSERT(pCommitter->dReader.iRow == 0); nRow += pCommitter->dReader.bDatal.nRow; } @@ -812,10 +820,13 @@ static int32_t tsdbMergeCommitLast(SCommitter *pCommitter, STbDataIter *pIter) { pBlockData = &pCommitter->dWriter.bDatal; // commit and reset block data schema if need - if (pBlockData->nRow > 0) { - if (pBlockData->suid != pTbData->suid || pBlockData->suid == 0) { - code = tsdbCommitLastBlock(pCommitter); - if (code) goto _err; + // QUESTION: Is there a case that pBlockData->nRow == 0 but need to change schema ? + if (pBlockData->suid || pBlockData->uid) { + if (pBlockData->suid != pTbData->uid || pBlockData->suid == 0) { + if (pBlockData->nRow > 0) { + code = tsdbCommitLastBlock(pCommitter); + if (code) goto _err; + } tBlockDataReset(pBlockData); } @@ -962,10 +973,12 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) { pBlock = NULL; } - code = tsdbCommitterUpdateTableSchema(pCommitter, pTbData->suid, pTbData->uid, pTbData->maxSkmVer); + code = tsdbCommitterUpdateTableSchema(pCommitter, pTbData->suid, pTbData->uid, pTbData->maxSkmVer /*TODO*/); if (code) goto _err; tMapDataReset(&pCommitter->dWriter.mBlock); + code = tBlockDataInit(&pCommitter->dReader.bData, pTbData->suid, pTbData->uid, pCommitter->skmTable.pTSchema); + if (code) goto _err; code = tBlockDataInit(&pCommitter->dWriter.bData, pTbData->suid, pTbData->uid, pCommitter->skmTable.pTSchema); if (code) goto _err; @@ -1130,11 +1143,12 @@ static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) { ASSERT(pBlockDataR->nRow > 0); // commit and reset block data schema if need - if (pBlockDataW->nRow > 0) { - if (pBlockDataW->suid != pCommitter->dReader.pRowInfo->suid || pBlockDataW->suid == 0) { - code = tsdbCommitLastBlock(pCommitter); - if (code) goto _err; - + if (pBlockDataW->suid || pBlockDataW->uid) { + if (pBlockDataW->suid != suid || pBlockDataW->suid == 0) { + if (pBlockDataW->nRow > 0) { + code = tsdbCommitLastBlock(pCommitter); + if (code) goto _err; + } tBlockDataReset(pBlockDataW); } } @@ -1151,9 +1165,10 @@ static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) { // check if it can make sure that one table data in one block int32_t nRow = 0; if (pBlockDataR->suid) { - for (int32_t iRow = pCommitter->dReader.iRow; (iRow < pBlockDataR->nRow) && (pBlockDataR->aUid[iRow] == uid); - iRow++) { + int32_t iRow = pCommitter->dReader.iRow; + while ((iRow < pBlockDataR->nRow) && (pBlockDataR->aUid[iRow] == uid)) { nRow++; + iRow++; } } else { ASSERT(pCommitter->dReader.iRow == 0);