From 56d26973080fd6da11b594862138e81baaf9378e Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Sat, 30 Jul 2022 12:46:40 +0800 Subject: [PATCH] fix: new tRowMergerInit2 for ts row merging --- source/dnode/vnode/src/inc/tsdb.h | 3 + source/dnode/vnode/src/tsdb/tsdbRead.c | 50 ++++++++---- source/dnode/vnode/src/tsdb/tsdbUtil.c | 108 ++++++++++++++++++++++--- 3 files changed, 135 insertions(+), 26 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index d8c84e952b..bbf100cf3a 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -90,6 +90,9 @@ int32_t tsdbRowCmprFn(const void *p1, const void *p2); void tRowIterInit(SRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema); SColVal *tRowIterNext(SRowIter *pIter); // SRowMerger +int32_t tRowMergerInit2(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRow, STSchema *pTSchema); +int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema); + int32_t tRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema); void tRowMergerClear(SRowMerger *pMerger); int32_t tRowMerge(SRowMerger *pMerger, TSDBROW *pRow); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 0eec715bd1..9545ded927 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -143,7 +143,7 @@ static int buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, i static TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader); static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader, SRowMerger* pMerger); -static int32_t doMergeRowsInBuf(SIterInfo* pIter, int64_t ts, SArray* pDelList, SRowMerger* pMerger, +static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger, STsdbReader* pReader); static int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow); static void setComposedBlockFlag(STsdbReader* pReader, bool composed); @@ -1230,7 +1230,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); tRowMerge(&merge, pRow); - doMergeRowsInBuf(pIter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); + doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); tRowMergerGetRow(&merge, &pTSRow); } @@ -1246,7 +1246,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* updateSchema(pRow, pBlockScanInfo->uid, pReader); tRowMergerInit(&merge, pRow, pReader->pSchema); - doMergeRowsInBuf(pIter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); + doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); tRowMerge(&merge, &fRow); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); @@ -1291,12 +1291,12 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* if (ik.ts == key) { tRowMerge(&merge, piRow); - doMergeRowsInBuf(&pBlockScanInfo->iiter, key, pBlockScanInfo->delSkyline, &merge, pReader); + doMergeRowsInBuf(&pBlockScanInfo->iiter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader); } if (k.ts == key) { tRowMerge(&merge, pRow); - doMergeRowsInBuf(&pBlockScanInfo->iter, key, pBlockScanInfo->delSkyline, &merge, pReader); + doMergeRowsInBuf(&pBlockScanInfo->iter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader); } tRowMergerGetRow(&merge, &pTSRow); @@ -1336,11 +1336,11 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* updateSchema(pRow, uid, pReader); tRowMergerInit(&merge, pRow, pReader->pSchema); - doMergeRowsInBuf(&pBlockScanInfo->iter, key, pBlockScanInfo->delSkyline, &merge, pReader); + doMergeRowsInBuf(&pBlockScanInfo->iter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader); if (ik.ts == k.ts) { tRowMerge(&merge, piRow); - doMergeRowsInBuf(&pBlockScanInfo->iiter, key, pBlockScanInfo->delSkyline, &merge, pReader); + doMergeRowsInBuf(&pBlockScanInfo->iiter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader); } if (k.ts == key) { @@ -2097,7 +2097,8 @@ TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pRea } } -int32_t doMergeRowsInBuf(SIterInfo* pIter, int64_t ts, SArray* pDelList, SRowMerger* pMerger, STsdbReader* pReader) { +int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger, + STsdbReader* pReader) { while (1) { pIter->hasVal = tsdbTbDataIterNext(pIter->iter); if (!pIter->hasVal) { @@ -2116,7 +2117,15 @@ int32_t doMergeRowsInBuf(SIterInfo* pIter, int64_t ts, SArray* pDelList, SRowMer break; } - tRowMerge(pMerger, pRow); + int32_t sversion = TSDBROW_SVERSION(pRow); + STSchema* pTSchema = NULL; + if (sversion != pReader->pSchema->version) { + metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pTSchema); + } else { + pTSchema = pReader->pSchema; + } + + tRowMergerAdd(pMerger, pRow, pTSchema); } return TSDB_CODE_SUCCESS; @@ -2229,7 +2238,7 @@ void updateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader) { if (pReader->pSchema == NULL) { metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pSchema); - } else if (pReader->pSchema->version < sversion) { + } else if (pReader->pSchema->version != sversion) { taosMemoryFreeClear(pReader->pSchema); metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pSchema); } @@ -2240,10 +2249,17 @@ void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDe SRowMerger merge = {0}; TSDBKEY k = TSDBROW_KEY(pRow); - updateSchema(pRow, uid, pReader); + // updateSchema(pRow, uid, pReader); + int32_t sversion = TSDBROW_SVERSION(pRow); + STSchema* pTSchema = NULL; + if (sversion != pReader->pSchema->version) { + metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pTSchema); + } else { + pTSchema = pReader->pSchema; + } - tRowMergerInit(&merge, pRow, pReader->pSchema); - doMergeRowsInBuf(pIter, k.ts, pDelList, &merge, pReader); + tRowMergerInit2(&merge, pReader->pSchema, pRow, pTSchema); + doMergeRowsInBuf(pIter, uid, k.ts, pDelList, &merge, pReader); tRowMergerGetRow(&merge, pTSRow); tRowMergerClear(&merge); } @@ -2259,18 +2275,18 @@ void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlo updateSchema(piRow, pBlockScanInfo->uid, pReader); tRowMergerInit(&merge, piRow, pReader->pSchema); - doMergeRowsInBuf(&pBlockScanInfo->iiter, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader); + doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader); tRowMerge(&merge, pRow); - doMergeRowsInBuf(&pBlockScanInfo->iter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); + doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); } else { updateSchema(pRow, pBlockScanInfo->uid, pReader); tRowMergerInit(&merge, pRow, pReader->pSchema); - doMergeRowsInBuf(&pBlockScanInfo->iter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); + doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); tRowMerge(&merge, piRow); - doMergeRowsInBuf(&pBlockScanInfo->iiter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); + doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); } tRowMergerGetRow(&merge, pTSRow); diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 3e05b75dd0..211a330599 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include "tdataformat.h" #include "tsdb.h" // SMapData ======================================================================= @@ -568,6 +569,95 @@ SColVal *tRowIterNext(SRowIter *pIter) { } // SRowMerger ====================================================== + +int32_t tRowMergerInit2(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRow, STSchema *pTSchema) { + int32_t code = 0; + TSDBKEY key = TSDBROW_KEY(pRow); + SColVal *pColVal = &(SColVal){0}; + STColumn *pTColumn; + int32_t iCol = 0; + + pMerger->pTSchema = pResTSchema; + pMerger->version = key.version; + + pMerger->pArray = taosArrayInit(pResTSchema->numOfCols, sizeof(SColVal)); + if (pMerger->pArray == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + // ts + pTColumn = &pTSchema->columns[iCol++]; + + ASSERT(pTColumn->type == TSDB_DATA_TYPE_TIMESTAMP); + + *pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.ts = key.ts}); + if (taosArrayPush(pMerger->pArray, pColVal) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + // other + for (; iCol < pTSchema->numOfCols && iCol < pResTSchema->numOfCols; ++iCol) { + pTColumn = &pResTSchema->columns[iCol]; + if (pTSchema->columns[iCol].colId != pTColumn->colId) { + taosArrayPush(pMerger->pArray, &COL_VAL_NONE(pTColumn->colId, pTColumn->type)); + continue; + } + + tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal); + if (taosArrayPush(pMerger->pArray, pColVal) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + } + + for (; iCol < pResTSchema->numOfCols; ++iCol) { + pTColumn = &pResTSchema->columns[iCol]; + taosArrayPush(pMerger->pArray, &COL_VAL_NONE(pTColumn->colId, pTColumn->type)); + } + +_exit: + return code; +} + +int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) { + int32_t code = 0; + TSDBKEY key = TSDBROW_KEY(pRow); + SColVal *pColVal = &(SColVal){0}; + STColumn *pTColumn; + int32_t iCol; + + ASSERT(((SColVal *)pMerger->pArray->pData)->value.ts == key.ts); + + for (iCol = 1; iCol < pMerger->pTSchema->numOfCols && iCol < pTSchema->numOfCols; ++iCol) { + pTColumn = &pMerger->pTSchema->columns[iCol]; + if (pTSchema->columns[iCol].colId != pTColumn->colId) { + continue; + } + + tsdbRowGetColVal(pRow, pMerger->pTSchema, iCol, pColVal); + + if (key.version > pMerger->version) { + if (!pColVal->isNone) { + taosArraySet(pMerger->pArray, iCol, pColVal); + } + } else if (key.version < pMerger->version) { + SColVal *tColVal = (SColVal *)taosArrayGet(pMerger->pArray, iCol); + if (tColVal->isNone && !pColVal->isNone) { + taosArraySet(pMerger->pArray, iCol, pColVal); + } + } else { + ASSERT(0); + } + } + + pMerger->version = key.version; + +_exit: + return code; +} + int32_t tRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) { int32_t code = 0; TSDBKEY key = TSDBROW_KEY(pRow); @@ -1401,7 +1491,7 @@ void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) { break; case TSDB_DATA_TYPE_BOOL: break; - case TSDB_DATA_TYPE_TINYINT:{ + case TSDB_DATA_TYPE_TINYINT: { pColAgg->sum += colVal.value.i8; if (pColAgg->min > colVal.value.i8) { pColAgg->min = colVal.value.i8; @@ -1411,7 +1501,7 @@ void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) { } break; } - case TSDB_DATA_TYPE_SMALLINT:{ + case TSDB_DATA_TYPE_SMALLINT: { pColAgg->sum += colVal.value.i16; if (pColAgg->min > colVal.value.i16) { pColAgg->min = colVal.value.i16; @@ -1441,7 +1531,7 @@ void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) { } break; } - case TSDB_DATA_TYPE_FLOAT:{ + case TSDB_DATA_TYPE_FLOAT: { pColAgg->sum += colVal.value.f; if (pColAgg->min > colVal.value.f) { pColAgg->min = colVal.value.f; @@ -1451,7 +1541,7 @@ void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) { } break; } - case TSDB_DATA_TYPE_DOUBLE:{ + case TSDB_DATA_TYPE_DOUBLE: { pColAgg->sum += colVal.value.d; if (pColAgg->min > colVal.value.d) { pColAgg->min = colVal.value.d; @@ -1463,7 +1553,7 @@ void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) { } case TSDB_DATA_TYPE_VARCHAR: break; - case TSDB_DATA_TYPE_TIMESTAMP:{ + case TSDB_DATA_TYPE_TIMESTAMP: { if (pColAgg->min > colVal.value.i64) { pColAgg->min = colVal.value.i64; } @@ -1474,7 +1564,7 @@ void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) { } case TSDB_DATA_TYPE_NCHAR: break; - case TSDB_DATA_TYPE_UTINYINT:{ + case TSDB_DATA_TYPE_UTINYINT: { pColAgg->sum += colVal.value.u8; if (pColAgg->min > colVal.value.u8) { pColAgg->min = colVal.value.u8; @@ -1484,7 +1574,7 @@ void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) { } break; } - case TSDB_DATA_TYPE_USMALLINT:{ + case TSDB_DATA_TYPE_USMALLINT: { pColAgg->sum += colVal.value.u16; if (pColAgg->min > colVal.value.u16) { pColAgg->min = colVal.value.u16; @@ -1494,7 +1584,7 @@ void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) { } break; } - case TSDB_DATA_TYPE_UINT:{ + case TSDB_DATA_TYPE_UINT: { pColAgg->sum += colVal.value.u32; if (pColAgg->min > colVal.value.u32) { pColAgg->min = colVal.value.u32; @@ -1504,7 +1594,7 @@ void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) { } break; } - case TSDB_DATA_TYPE_UBIGINT:{ + case TSDB_DATA_TYPE_UBIGINT: { pColAgg->sum += colVal.value.u64; if (pColAgg->min > colVal.value.u64) { pColAgg->min = colVal.value.u64; -- GitLab