diff --git a/source/libs/parser/src/parInsertData.c b/source/libs/parser/src/parInsertData.c index fa5a3f4cd0f63b6cddaf2700edfb9d2c0db4fc4b..3b1de7e20d303d7d13d31459fcab0cde071ef1b1 100644 --- a/source/libs/parser/src/parInsertData.c +++ b/source/libs/parser/src/parInsertData.c @@ -12,7 +12,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -// clang-format on +// clang-format off #include "parInsertData.h" #include "catalog.h" @@ -37,14 +37,14 @@ typedef struct SBlockKeyInfo { typedef struct { int32_t index; - SArray* rowArray; // array of merged rows(mem allocated by tRealloc) + SArray* rowArray; // array of merged rows(mem allocated by tRealloc/free by tFree) STSchema* pSchema; + int64_t tbUid; // suid for child table, uid for normal table } SBlockRowMerger; -static void tdResetSBlockRowMerger(SBlockRowMerger* pMerger) { +static FORCE_INLINE void tdResetSBlockRowMerger(SBlockRowMerger* pMerger) { if (pMerger) { pMerger->index = -1; - taosMemoryFreeClear(pMerger->pSchema); } } @@ -57,6 +57,7 @@ static void tdFreeSBlockRowMerger(SBlockRowMerger* pMerger) { taosArrayDestroy(pMerger->rowArray); taosMemoryFreeClear(pMerger->pSchema); + taosMemoryFree(pMerger); } } @@ -430,15 +431,15 @@ static void* tdGetCurRowFromBlockMerger(SBlockRowMerger* pBlkRowMerger) { return NULL; } -static int32_t tdBlockRowMerge(STableDataBlocks* dataBuf, SBlockKeyTuple* pEndKeyTp, int32_t nDupRows, +static int32_t tdBlockRowMerge(STableMeta* pTableMeta, SBlockKeyTuple* pEndKeyTp, int32_t nDupRows, SBlockRowMerger** pBlkRowMerger, int32_t rowSize) { ASSERT(nDupRows > 1); SBlockKeyTuple* pStartKeyTp = pEndKeyTp - (nDupRows - 1); ASSERT(pStartKeyTp->skey == pEndKeyTp->skey); - STSRow* pEndRow = (STSRow*)pEndKeyTp->payloadAddr; // TODO: optimization if end row is all normal #if 0 + STSRow* pEndRow = (STSRow*)pEndKeyTp->payloadAddr; if(isNormal(pEndRow)) { // set the end row if it is normal and return directly pStartKeyTp->payloadAddr = pEndKeyTp->payloadAddr; return TSDB_CODE_SUCCESS; @@ -461,14 +462,25 @@ static int32_t tdBlockRowMerge(STableDataBlocks* dataBuf, SBlockKeyTuple* pEndKe } } + if ((*pBlkRowMerger)->pSchema) { + if ((*pBlkRowMerger)->pSchema->version != pTableMeta->sversion) { + taosMemoryFreeClear((*pBlkRowMerger)->pSchema); + } else { + if ((*pBlkRowMerger)->tbUid != (pTableMeta->suid > 0 ? pTableMeta->suid : pTableMeta->uid)) { + taosMemoryFreeClear((*pBlkRowMerger)->pSchema); + } + } + } + if (!(*pBlkRowMerger)->pSchema) { - (*pBlkRowMerger)->pSchema = tdGetSTSChemaFromSSChema( - dataBuf->pTableMeta->schema, dataBuf->pTableMeta->tableInfo.numOfColumns, dataBuf->pTableMeta->sversion); + (*pBlkRowMerger)->pSchema = + tdGetSTSChemaFromSSChema(pTableMeta->schema, pTableMeta->tableInfo.numOfColumns, pTableMeta->sversion); if (!(*pBlkRowMerger)->pSchema) { terrno = TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_FAILED; } + (*pBlkRowMerger)->tbUid = pTableMeta->suid > 0 ? pTableMeta->suid : pTableMeta->uid; } void* pDestRow = NULL; @@ -491,7 +503,7 @@ static int32_t tdBlockRowMerge(STableDataBlocks* dataBuf, SBlockKeyTuple* pEndKe SArray* pArray = taosArrayInit(pSchema->numOfCols, sizeof(SColVal)); for (int32_t i = 0; i < pSchema->numOfCols; ++i) { SColVal colVal = {0}; - for (int32_t j = 0; j < nDupRows; ++i) { + for (int32_t j = 0; j < nDupRows; ++j) { tTSRowGetVal((pEndKeyTp - j)->payloadAddr, pSchema, i, &colVal); if (!colVal.isNone) { break; @@ -512,6 +524,7 @@ static int32_t tdBlockRowMerge(STableDataBlocks* dataBuf, SBlockKeyTuple* pEndKe static int sortMergeDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKeyInfo, SBlockRowMerger** ppBlkRowMerger) { SSubmitBlk* pBlocks = (SSubmitBlk*)dataBuf->pData; + STableMeta* pTableMeta = dataBuf->pTableMeta; int16_t nRows = pBlocks->numOfRows; // size is less than the total size, since duplicated rows may be removed. @@ -566,11 +579,13 @@ static int sortMergeDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* p } if ((j - i) > 1) { - if (tdBlockRowMerge(dataBuf, (pBlkKeyTuple + j - 1), j - i, ppBlkRowMerger, extendedRowSize) < 0) { + if (tdBlockRowMerge(pTableMeta, (pBlkKeyTuple + j - 1), j - i, ppBlkRowMerger, extendedRowSize) < 0) { return TSDB_CODE_FAILED; } (pBlkKeyTuple + nextPos)->payloadAddr = tdGetCurRowFromBlockMerger(*ppBlkRowMerger); - hasDup = true; + if (!hasDup) { + hasDup = true; + } i = j; } else { if (hasDup) { @@ -585,7 +600,7 @@ static int sortMergeDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* p if ((j - i) > 1) { ASSERT((pBlkKeyTuple + i)->skey == (pBlkKeyTuple + j - 1)->skey); - if (tdBlockRowMerge(dataBuf, (pBlkKeyTuple + j - 1), j - i, ppBlkRowMerger, extendedRowSize) < 0) { + if (tdBlockRowMerge(pTableMeta, (pBlkKeyTuple + j - 1), j - i, ppBlkRowMerger, extendedRowSize) < 0) { return TSDB_CODE_FAILED; } (pBlkKeyTuple + nextPos)->payloadAddr = tdGetCurRowFromBlockMerger(*ppBlkRowMerger); @@ -594,7 +609,7 @@ static int sortMergeDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* p } dataBuf->ordered = true; - pBlocks->numOfRows = i + 1; + pBlocks->numOfRows = nextPos + 1; } dataBuf->size = sizeof(SSubmitBlk) + pBlocks->numOfRows * extendedRowSize;