From a036d2f3ff682705112f299034b483293fc38d52 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 19 Apr 2022 13:07:23 +0800 Subject: [PATCH] fix(query): fix the bug caused by refactor in first/last function implementation. --- include/common/tdatablock.h | 2 +- source/common/src/tdatablock.c | 17 ++++++------- source/dnode/vnode/src/tq/tqRead.c | 2 +- source/dnode/vnode/src/tsdb/tsdbRead.c | 2 +- source/libs/executor/src/executorimpl.c | 4 +-- source/libs/function/src/builtinsimpl.c | 25 +++++++++++++------ source/libs/scalar/src/scalar.c | 4 +-- .../libs/scalar/test/filter/filterTests.cpp | 2 +- .../libs/scalar/test/scalar/scalarTests.cpp | 6 ++--- 9 files changed, 36 insertions(+), 28 deletions(-) diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 15f3246013..043f2d1d12 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -196,7 +196,7 @@ size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock); int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo); int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst); -int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows); +int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, size_t existRows, uint32_t numOfRows); int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows); void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index f3bcf4bafc..975d097205 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1076,8 +1076,8 @@ void blockDataCleanup(SSDataBlock* pDataBlock) { } } -int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows) { - if (0 == numOfRows) { +int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, size_t existRows, uint32_t numOfRows) { + if (0 == numOfRows || numOfRows <= existRows) { return TSDB_CODE_SUCCESS; } @@ -1088,19 +1088,16 @@ int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows) } pColumn->varmeta.offset = (int32_t*)tmp; - memset(pColumn->varmeta.offset, 0, sizeof(int32_t) * numOfRows); - - pColumn->varmeta.length = 0; - pColumn->varmeta.allocLen = 0; - taosMemoryFreeClear(pColumn->pData); + memset(&pColumn->varmeta.offset[existRows], 0, sizeof(int32_t) * (numOfRows - existRows)); } else { char* tmp = taosMemoryRealloc(pColumn->nullbitmap, BitmapLen(numOfRows)); if (tmp == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } + int32_t oldLen = BitmapLen(existRows); pColumn->nullbitmap = tmp; - memset(pColumn->nullbitmap, 0, BitmapLen(numOfRows)); + memset(&pColumn->nullbitmap[oldLen], 0, BitmapLen(numOfRows) - oldLen); if (pColumn->info.type == TSDB_DATA_TYPE_NULL) { return TSDB_CODE_SUCCESS; @@ -1136,7 +1133,7 @@ int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) { for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); - code = colInfoDataEnsureCapacity(p, numOfRows); + code = colInfoDataEnsureCapacity(p, pDataBlock->info.rows, numOfRows); if (code) { return code; } @@ -1181,7 +1178,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i); - int32_t code = colInfoDataEnsureCapacity(pDst, pDataBlock->info.rows); + int32_t code = colInfoDataEnsureCapacity(pDst, 0, pDataBlock->info.rows); if (code != TSDB_CODE_SUCCESS) { return NULL; } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 9282f7197e..cf62ea8714 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -141,7 +141,7 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { colInfo.info.colId = pColSchema->colId; colInfo.info.type = pColSchema->type; - if (colInfoDataEnsureCapacity(&colInfo, numOfRows) < 0) { + if (colInfoDataEnsureCapacity(&colInfo, 0, numOfRows) < 0) { taosArrayDestroyEx(pArray, (void (*)(void*))tDeleteSSDataBlock); return NULL; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index ab933f840a..2ad34d1561 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -392,7 +392,7 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond, SColumnInfoData colInfo = {{0}, 0}; colInfo.info = pCond->colList[i]; - int32_t code = colInfoDataEnsureCapacity(&colInfo, pReadHandle->outputCapacity); + int32_t code = colInfoDataEnsureCapacity(&colInfo, 0, pReadHandle->outputCapacity); if (code != TSDB_CODE_SUCCESS) { goto _end; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index eaf09237de..366da58079 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -832,7 +832,7 @@ static void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQuer pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP; pColData->info.bytes = sizeof(int64_t); - colInfoDataEnsureCapacity(pColData, 5); + colInfoDataEnsureCapacity(pColData, 0, 5); colDataAppendInt64(pColData, 0, &pQueryWindow->skey); colDataAppendInt64(pColData, 1, &pQueryWindow->ekey); @@ -1061,7 +1061,7 @@ static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunc } ASSERT(!IS_VAR_DATA_TYPE(type)); - colInfoDataEnsureCapacity(pColInfo, numOfRows); + colInfoDataEnsureCapacity(pColInfo, 0, numOfRows); if (type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT) { int64_t v = pFuncParam->param.i; diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 3b674759a3..5a32a6ffec 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -767,10 +767,18 @@ void percentileFinalize(SqlFunctionCtx* pCtx) { bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { SColumnNode* pNode = nodesListGetNode(pFunc->pParameterList, 0); - pEnv->calcMemSize = pNode->node.resType.bytes; + pEnv->calcMemSize = pNode->node.resType.bytes + sizeof(int64_t); return true; } +static FORCE_INLINE TSKEY getRowPTs(SColumnInfoData* pTsColInfo, int32_t rowIndex) { + if (pTsColInfo == NULL) { + return 0; + } + + return *(TSKEY*) colDataGetData(pTsColInfo, rowIndex); +} + // This ordinary first function does not care if current scan is ascending order or descending order scan // the OPTIMIZED version of first function will only handle the ascending order scan int32_t firstFunction(SqlFunctionCtx *pCtx) { @@ -792,8 +800,8 @@ int32_t firstFunction(SqlFunctionCtx *pCtx) { SColumnDataAgg* pColAgg = (pInput->colDataAggIsSet)? pInput->pColumnDataAgg[0]:NULL; - TSKEY startKey = *(int64_t*)(pInput->pPTS ? colDataGetData(pInput->pPTS, 0) : 0); - TSKEY endKey = *(int64_t*)(pInput->pPTS ? colDataGetData(pInput->pPTS, pInput->totalRows - 1) : 0); + TSKEY startKey = getRowPTs(pInput->pPTS, 0); + TSKEY endKey = getRowPTs(pInput->pPTS, pInput->totalRows - 1); int32_t blockDataOrder = (startKey <= endKey)? TSDB_ORDER_ASC:TSDB_ORDER_DESC; @@ -812,12 +820,14 @@ int32_t firstFunction(SqlFunctionCtx *pCtx) { } char* data = colDataGetData(pInputCol, i); + TSKEY cts = getRowPTs(pInput->pPTS, i); - TSKEY cts = pCtx->ptsList ? GET_TS_DATA(pCtx, i) : 0; - if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) < cts) { + if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) > cts) { memcpy(buf, data, bytes); *(TSKEY*)(buf + bytes) = cts; // DO_UPDATE_TAG_COLUMNS(pCtx, ts); + + pResInfo->numOfRes = 1; } numOfElems++; @@ -838,12 +848,13 @@ int32_t firstFunction(SqlFunctionCtx *pCtx) { } char* data = colDataGetData(pInputCol, i); + TSKEY cts = getRowPTs(pInput->pPTS, i); - TSKEY cts = pCtx->ptsList ? GET_TS_DATA(pCtx, i) : 0; - if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) < cts) { + if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) > cts) { memcpy(buf, data, bytes); *(TSKEY*)(buf + bytes) = cts; // DO_UPDATE_TAG_COLUMNS(pCtx, ts); + pResInfo->numOfRes = 1; } numOfElems++; diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index 0432ae1df8..337a773d5c 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -30,7 +30,7 @@ SColumnInfoData* createColumnInfoData(SDataType* pType, int32_t numOfRows) { pColumnData->info.scale = pType->scale; pColumnData->info.precision = pType->precision; - int32_t code = colInfoDataEnsureCapacity(pColumnData, numOfRows); + int32_t code = colInfoDataEnsureCapacity(pColumnData, 0, numOfRows); if (code != TSDB_CODE_SUCCESS) { terrno = TSDB_CODE_OUT_OF_MEMORY; taosMemoryFree(pColumnData); @@ -45,7 +45,7 @@ int32_t doConvertDataType(SValueNode* pValueNode, SScalarParam* out) { in.columnData = createColumnInfoData(&pValueNode->node.resType, 1); colDataAppend(in.columnData, 0, nodesGetValueFromNode(pValueNode), false); - colInfoDataEnsureCapacity(out->columnData, 1); + colInfoDataEnsureCapacity(out->columnData, 0, 1); int32_t code = vectorConvertImpl(&in, out); sclFreeParam(&in); diff --git a/source/libs/scalar/test/filter/filterTests.cpp b/source/libs/scalar/test/filter/filterTests.cpp index 26ef5dbd44..42998aba00 100644 --- a/source/libs/scalar/test/filter/filterTests.cpp +++ b/source/libs/scalar/test/filter/filterTests.cpp @@ -155,7 +155,7 @@ void flttMakeColumnNode(SNode **pNode, SSDataBlock **block, int32_t dataType, in res->info.numOfCols++; SColumnInfoData *pColumn = (SColumnInfoData *)taosArrayGetLast(res->pDataBlock); - colInfoDataEnsureCapacity(pColumn, rowNum); + colInfoDataEnsureCapacity(pColumn, 0, rowNum); for (int32_t i = 0; i < rowNum; ++i) { colDataAppend(pColumn, i, (const char *)value, false); diff --git a/source/libs/scalar/test/scalar/scalarTests.cpp b/source/libs/scalar/test/scalar/scalarTests.cpp index 61ef2fdce2..09d6528dd8 100644 --- a/source/libs/scalar/test/scalar/scalarTests.cpp +++ b/source/libs/scalar/test/scalar/scalarTests.cpp @@ -99,7 +99,7 @@ void scltAppendReservedSlot(SArray *pBlockList, int16_t *dataBlockId, int16_t *s SColumnInfoData idata = {0}; idata.info = *colInfo; - colInfoDataEnsureCapacity(&idata, rows); + colInfoDataEnsureCapacity(&idata, 0, rows); taosArrayPush(res->pDataBlock, &idata); @@ -186,7 +186,7 @@ void scltMakeColumnNode(SNode **pNode, SSDataBlock **block, int32_t dataType, in res->info.numOfCols++; SColumnInfoData *pColumn = (SColumnInfoData *)taosArrayGetLast(res->pDataBlock); - colInfoDataEnsureCapacity(pColumn, rowNum); + colInfoDataEnsureCapacity(pColumn, 0, rowNum); for (int32_t i = 0; i < rowNum; ++i) { colDataAppend(pColumn, i, (const char *)value, false); @@ -1467,7 +1467,7 @@ void scltMakeDataBlock(SScalarParam **pInput, int32_t type, void *pVal, int32_t input->numOfRows = num; input->columnData->info = createColumnInfo(0, type, bytes); - colInfoDataEnsureCapacity(input->columnData, num); + colInfoDataEnsureCapacity(input->columnData, 0, num); if (setVal) { for (int32_t i = 0; i < num; ++i) { -- GitLab