diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index f625d72d6324125ce9e77c8e031323db2437e7dc..e543c75fd3e3f68107becfa7af90a88a2e39bfeb 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -14,6 +14,11 @@ */ #include "tsdb.h" +typedef struct { + int64_t suid; + int64_t uid; + STSchema *pTSchema; +} SSkmInfo; typedef struct { STsdb *pTsdb; @@ -38,9 +43,8 @@ typedef struct { SArray *aBlockIdxN; // SArray SMapData nBlockMap; // SMapData SBlockData nBlockData; - int64_t suid; - int64_t uid; - STSchema *pTSchema; + SSkmInfo skmTable; + SSkmInfo skmRow; /* commit del */ SDelFReader *pDelFReader; SDelFWriter *pDelFWriter; @@ -307,24 +311,49 @@ _err: return code; } -static int32_t tsdbCommitterUpdateSchema(SCommitter *pCommitter, int64_t suid, int64_t uid, int32_t sver) { +static int32_t tsdbCommitterUpdateTableSchema(SCommitter *pCommitter, int64_t suid, int64_t uid, int32_t sver) { + int32_t code = 0; + + if (pCommitter->skmTable.pTSchema) { + if (pCommitter->skmTable.suid == suid) { + if (suid == 0) { + if (pCommitter->skmTable.uid == uid && sver == pCommitter->skmTable.pTSchema->version) goto _exit; + } else { + if (sver == pCommitter->skmTable.pTSchema->version) goto _exit; + } + } + } + + pCommitter->skmTable.suid = suid; + pCommitter->skmTable.uid = uid; + tTSchemaDestroy(pCommitter->skmTable.pTSchema); + pCommitter->skmTable.pTSchema = metaGetTbTSchema(pCommitter->pTsdb->pVnode->pMeta, uid, sver); + if (pCommitter->skmTable.pTSchema == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + +_exit: + return code; +} + +static int32_t tsdbCommitterUpdateRowSchema(SCommitter *pCommitter, int64_t suid, int64_t uid, int32_t sver) { int32_t code = 0; - if (pCommitter->pTSchema) { - if (pCommitter->suid == suid) { + if (pCommitter->skmRow.pTSchema) { + if (pCommitter->skmRow.suid == suid) { if (suid == 0) { - if (pCommitter->uid == uid && sver == pCommitter->pTSchema->version) goto _exit; + if (pCommitter->skmRow.uid == uid && sver == pCommitter->skmRow.pTSchema->version) goto _exit; } else { - if (sver == pCommitter->pTSchema->version) goto _exit; + if (sver == pCommitter->skmRow.pTSchema->version) goto _exit; } } } - pCommitter->suid = suid; - pCommitter->uid = uid; - tTSchemaDestroy(pCommitter->pTSchema); - pCommitter->pTSchema = metaGetTbTSchema(pCommitter->pTsdb->pVnode->pMeta, uid, sver); - if (pCommitter->pTSchema == NULL) { + pCommitter->skmRow.suid = suid; + pCommitter->skmRow.uid = uid; + tTSchemaDestroy(pCommitter->skmRow.pTSchema); + pCommitter->skmRow.pTSchema = metaGetTbTSchema(pCommitter->pTsdb->pVnode->pMeta, uid, sver); + if (pCommitter->skmRow.pTSchema == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; } @@ -377,7 +406,7 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB *pRow2 = tsdbRowFromBlockData(pBlockDataMerge, 0); ASSERT(pRow1 && tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0); ASSERT(tsdbKeyCmprFn(&TSDBROW_KEY(pRow2), &toKey) < 0); - code = tsdbCommitterUpdateSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow1)); + code = tsdbCommitterUpdateRowSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow1)); if (code) goto _err; tBlockReset(pBlock); @@ -407,14 +436,14 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB } _append_mem_row: - code = tBlockDataAppendRow(pBlockData, pRow1, pCommitter->pTSchema); + code = tBlockDataAppendRow(pBlockData, pRow1, pCommitter->skmRow.pTSchema); if (code) goto _err; tsdbTbDataIterNext(pIter); pRow1 = tsdbTbDataIterGet(pIter); if (pRow1) { if (tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0) { - code = tsdbCommitterUpdateSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow1)); + code = tsdbCommitterUpdateRowSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow1)); if (code) goto _err; } else { pRow1 = NULL; @@ -481,11 +510,11 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter } // update schema - code = tsdbCommitterUpdateSchema(pCommitter, suid, uid, TSDBROW_SVERSION(pRow)); + code = tsdbCommitterUpdateRowSchema(pCommitter, suid, uid, TSDBROW_SVERSION(pRow)); if (code) goto _err; // append - code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->pTSchema); + code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->skmRow.pTSchema); if (code) goto _err; tsdbTbDataIterNext(pIter); @@ -589,11 +618,11 @@ static int32_t tsdbMergeAsSubBlock(SCommitter *pCommitter, STbDataIter *pIter, S tBlockDataReset(pBlockData); pRow = tsdbTbDataIterGet(pIter); - code = tsdbCommitterUpdateSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow)); + code = tsdbCommitterUpdateRowSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow)); if (code) goto _err; while (true) { if (pRow) break; - code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->pTSchema); + code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->skmRow.pTSchema); if (code) goto _err; tsdbTbDataIterNext(pIter); @@ -602,7 +631,8 @@ static int32_t tsdbMergeAsSubBlock(SCommitter *pCommitter, STbDataIter *pIter, S int32_t c = tBlockCmprFn(&(SBlock){}, pBlock); if (c == 0) { - code = tsdbCommitterUpdateSchema(pCommitter, pIter->pTbData->suid, pIter->pTbData->uid, TSDBROW_SVERSION(pRow)); + code = + tsdbCommitterUpdateRowSchema(pCommitter, pIter->pTbData->suid, pIter->pTbData->uid, TSDBROW_SVERSION(pRow)); if (code) goto _err; } else if (c > 0) { pRow = NULL; @@ -955,7 +985,8 @@ static void tsdbCommitDataEnd(SCommitter *pCommitter) { taosArrayDestroy(pCommitter->aBlockIdxN); tMapDataClear(&pCommitter->nBlockMap); tBlockDataClear(&pCommitter->nBlockData); - tTSchemaDestroy(pCommitter->pTSchema); + tTSchemaDestroy(pCommitter->skmTable.pTSchema); + tTSchemaDestroy(pCommitter->skmRow.pTSchema); } static int32_t tsdbCommitData(SCommitter *pCommitter) {