diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 2cbf1b23d4aed22642da838a022d5dbeedda3bb8..3e1cf0c033825fca256a6c540884ccc3e3b092fb 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -83,7 +83,6 @@ typedef struct SResultInfo { // TODO refactor typedef struct STableQueryInfo { TSKEY lastKey; // last check ts, todo remove it later SResultRowPosition pos; // current active time window -// uint64_t uid; // table uid // int32_t groupIndex; // group id in table list // SVariant tag; // SResultRowInfo resInfo; // result info @@ -433,10 +432,9 @@ typedef struct STableIntervalOperatorInfo { typedef struct SAggOperatorInfo { SOptrBasicInfo binfo; - SDiskbasedBuf *pResultBuf; // query result buffer based on blocked-wised disk file SAggSupporter aggSup; STableQueryInfo *current; - uint32_t groupId; + uint64_t groupId; SGroupResInfo groupResInfo; STableQueryInfo *pTableQueryInfo; @@ -633,7 +631,6 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t reverseTime, SArray* pColMatchInfo, SSDataBlock* pResBlock, SNode* pCondition, SExecTaskInfo* pTaskInfo); SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); -SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SLimit* pLimit, SLimit* pSlimit, SExecTaskInfo* pTaskInfo); SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SArray* pIndexMap, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index ec086dbb77858fa90344769c2356b951ae4dad6e..98b72cf4c25a0deb6e25a21877aeb90b4f66d493 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -114,7 +114,6 @@ void closeAllResultRows(SResultRowInfo *pResultRowInfo) { assert(pResultRowInfo->size >= 0 && pResultRowInfo->capacity >= pResultRowInfo->size); for (int32_t i = 0; i < pResultRowInfo->size; ++i) { -// ASSERT(0); // SResultRow* pRow = pResultRowInfo->pResult[i]; // if (pRow->closed) { // continue; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index ca4a12b7ff0a492e42837b13f66d467acc1e5bf9..25e5c2ee00f904fe34d6d5732edd38c52cee366f 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -242,8 +242,7 @@ static void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, static void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo); static void setCtxTagForJoin(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable); -static void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, int32_t tableGroupId, - SExecTaskInfo* pTaskInfo); +static void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, uint64_t groupId, SExecTaskInfo* pTaskInfo); SArray* getOrderCheckColumns(STaskAttr* pQuery); @@ -511,8 +510,6 @@ static SResultRow* doSetResultOutBufByKey_rv(SDiskbasedBuf* pResultBuf, SResultR // pResultRowInfo->curPos = pResultRowInfo->size; pResultRowInfo->pPosition[pResultRowInfo->size++] = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset}; pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset}; - -// int64_t index = pResultRowInfo->curPos; SET_RES_EXT_WINDOW_KEY(pSup->keyBuf, pData, bytes, uid, pResultRowInfo); taosHashPut(pSup->pResultRowListSet, pSup->keyBuf, GET_RES_EXT_WINDOW_KEY_LEN(bytes), &pResultRowInfo->cur, POINTER_BYTES); } else { @@ -2887,7 +2884,7 @@ void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SD pCtx[j].resultInfo = getResultCell(pRow, j, rowCellInfoOffset); struct SResultRowEntryInfo* pResInfo = pCtx[j].resultInfo; - if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) { + if (!isRowEntryInitialized(pResInfo)) { continue; } @@ -3051,18 +3048,17 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) { blockDataUpdateTsWindow(pBlock); } -void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, int32_t tableGroupId, SExecTaskInfo* pTaskInfo) { +void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, uint64_t groupId, SExecTaskInfo* pTaskInfo) { // for simple group by query without interval, all the tables belong to one group result. int64_t uid = 0; - int64_t tid = 0; SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo; SqlFunctionCtx* pCtx = pAggInfo->binfo.pCtx; int32_t* rowCellInfoOffset = pAggInfo->binfo.rowCellInfoOffset; SResultRow* pResultRow = - doSetResultOutBufByKey_rv(pAggInfo->pResultBuf, pResultRowInfo, tid, (char*)&tableGroupId, sizeof(tableGroupId), - true, uid, pTaskInfo, false, &pAggInfo->aggSup); + doSetResultOutBufByKey_rv(pAggInfo->aggSup.pResultBuf, pResultRowInfo, uid, (char*)&groupId, sizeof(groupId), + true, groupId, pTaskInfo, false, &pAggInfo->aggSup); assert(pResultRow != NULL); /* @@ -3070,7 +3066,7 @@ void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, i * all group belong to one result set, and each group result has different group id so set the id to be one */ if (pResultRow->pageId == -1) { - int32_t ret = addNewWindowResultBuf(pResultRow, pAggInfo->pResultBuf, tableGroupId, pAggInfo->binfo.pRes->info.rowSize); + int32_t ret = addNewWindowResultBuf(pResultRow, pAggInfo->aggSup.pResultBuf, groupId, pAggInfo->binfo.pRes->info.rowSize); if (ret != TSDB_CODE_SUCCESS) { return; } @@ -3079,18 +3075,15 @@ void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, i setResultRowOutputBufInitCtx_rv(pResultRow, pCtx, numOfOutput, rowCellInfoOffset); } -void setExecutionContext(int32_t numOfOutput, int32_t tableGroupId, TSKEY nextKey, SExecTaskInfo* pTaskInfo, - STableQueryInfo* pTableQueryInfo, SAggOperatorInfo* pAggInfo) { - // lastKey needs to be updated - pTableQueryInfo->lastKey = nextKey; - if (pAggInfo->groupId != INT32_MIN && pAggInfo->groupId == tableGroupId) { +void setExecutionContext(int32_t numOfOutput, uint64_t groupId, SExecTaskInfo* pTaskInfo, SAggOperatorInfo* pAggInfo) { + if (pAggInfo->groupId != INT32_MIN && pAggInfo->groupId == groupId) { return; } - doSetTableGroupOutputBuf(pAggInfo, numOfOutput, tableGroupId, pTaskInfo); + doSetTableGroupOutputBuf(pAggInfo, numOfOutput, groupId, pTaskInfo); // record the current active group id - pAggInfo->groupId = tableGroupId; + pAggInfo->groupId = groupId; } void setCtxTagForJoin(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable) { @@ -4841,7 +4834,9 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { return TSDB_CODE_SUCCESS; } + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SAggOperatorInfo* pAggInfo = pOperator->info; + SOptrBasicInfo* pInfo = &pAggInfo->binfo; int32_t order = TSDB_ORDER_ASC; @@ -4866,6 +4861,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { } // the pDataBlock are always the same one, no need to call this again + setExecutionContext(pOperator->numOfOutput, pBlock->info.groupId, pTaskInfo, pAggInfo); setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); doAggregateImpl(pOperator, 0, pInfo->pCtx); @@ -4885,8 +4881,11 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { #endif } - finalizeQueryResult(pInfo->pCtx, pOperator->numOfOutput); + closeAllResultRows(&pAggInfo->binfo.resultRowInfo); + finalizeMultiTupleQueryResult(pAggInfo->binfo.pCtx, pOperator->numOfOutput, pAggInfo->aggSup.pResultBuf, + &pAggInfo->binfo.resultRowInfo, pAggInfo->binfo.rowCellInfoOffset); + initGroupResInfo(&pAggInfo->groupResInfo, &pAggInfo->binfo.resultRowInfo); OPTR_SET_OPENED(pOperator); return TSDB_CODE_SUCCESS; } @@ -4905,9 +4904,13 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator, bool* newgroup) return NULL; } - getNumOfResult(pInfo->pCtx, pOperator->numOfOutput, pInfo->pRes); - doSetOperatorCompleted(pOperator); + blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); + toSDatablock(pInfo->pRes, pOperator->resultInfo.capacity, &pAggInfo->groupResInfo, pOperator->pExpr, pAggInfo->aggSup.pResultBuf, pInfo->rowCellInfoOffset); + if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pAggInfo->groupResInfo)) { + doSetOperatorCompleted(pOperator); + } + doSetOperatorCompleted(pOperator); return (blockDataGetNumOfRows(pInfo->pRes) != 0) ? pInfo->pRes : NULL; } @@ -5021,74 +5024,6 @@ bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter *pSup, SOptrBasi return true; } -static SSDataBlock* doMultiTableAggregate(SOperatorInfo* pOperator, bool* newgroup) { - if (pOperator->status == OP_EXEC_DONE) { - return NULL; - } - - SAggOperatorInfo* pAggInfo = pOperator->info; - SOptrBasicInfo* pInfo = &pAggInfo->binfo; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - - if (pOperator->status == OP_RES_TO_RETURN) { - toSDatablock(pInfo->pRes, pAggInfo->binfo.capacity, &pAggInfo->groupResInfo, pOperator->pExpr, pAggInfo->pResultBuf, pAggInfo->binfo.rowCellInfoOffset); - if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pAggInfo->groupResInfo)) { - pOperator->status = OP_EXEC_DONE; - } - - return pInfo->pRes; - } - - // table scan order - int32_t order = TSDB_ORDER_ASC; - SOperatorInfo* downstream = pOperator->pDownstream[0]; - - while (1) { - publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); - publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); - - if (pBlock == NULL) { - break; - } - - // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput); - // if (downstream->operatorType == OP_TableScan) { - // STableScanInfo* pScanInfo = downstream->info; - // order = getTableScanOrder(pScanInfo); - // } - - // the pDataBlock are always the same one, no need to call this again - setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); - - TSKEY key = 0; - if (order == TSDB_ORDER_ASC) { - key = pBlock->info.window.ekey; - TSKEY_MAX_ADD(key, 1); - } else { - key = pBlock->info.window.skey; - TSKEY_MIN_SUB(key, -1); - } - -// setExecutionContext(pOperator->numOfOutput, pAggInfo->current->groupIndex, key, pTaskInfo, pAggInfo->current, -// pAggInfo); - doAggregateImpl(pOperator, 0, pInfo->pCtx); - } - - pOperator->status = OP_RES_TO_RETURN; - closeAllResultRows(&pInfo->resultRowInfo); - updateNumOfRowsInResultRows(pInfo->pCtx, pOperator->numOfOutput, &pInfo->resultRowInfo, pInfo->rowCellInfoOffset); - - initGroupResInfo(&pAggInfo->groupResInfo, &pInfo->resultRowInfo); - toSDatablock(pInfo->pRes, pAggInfo->binfo.capacity, &pAggInfo->groupResInfo, pOperator->pExpr, pAggInfo->pResultBuf, pAggInfo->binfo.rowCellInfoOffset); - - if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pAggInfo->groupResInfo)) { - doSetOperatorCompleted(pOperator); - } - - return pInfo->pRes; -} - static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup) { SProjectOperatorInfo* pProjectInfo = pOperator->info; SOptrBasicInfo* pInfo = &pProjectInfo->binfo; @@ -5887,7 +5822,6 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* goto _error; } - //(int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery)); int32_t numOfRows = 1; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResultBlock, keyBufSize, pTaskInfo->id.str); @@ -5896,7 +5830,13 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* goto _error; } - setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, pTaskInfo); + pOperator->resultInfo.capacity = 4096; + pOperator->resultInfo.threshold = 4096 * 0.75; + + int32_t numOfGroup = 10; // todo replaced with true value + pInfo->groupId = INT32_MIN; + initResultRowInfo(&pInfo->binfo.resultRowInfo, numOfGroup); + pInfo->pScalarExprInfo = pScalarExprInfo; pInfo->numOfScalarExpr = numOfScalarExpr; if (pInfo->pScalarExprInfo != NULL) { @@ -5910,11 +5850,11 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pOperator->info = pInfo; pOperator->pExpr = pExprInfo; pOperator->numOfOutput = numOfCols; - pOperator->pTaskInfo = pTaskInfo; pOperator->_openFn = doOpenAggregateOptr; pOperator->getNextFn = getAggregateResult; pOperator->closeFn = destroyAggOperatorInfo; + pOperator->encodeResultRow = aggEncodeResultRow; pOperator->decodeResultRow = aggDecodeResultRow; @@ -6005,46 +5945,6 @@ void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) { tsem_destroy(&pExInfo->ready); } -SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, - const STableGroupInfo* pTableGroupInfo) { - SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo)); - - int32_t numOfRows = 1; - size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - int32_t code = - initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, keyBufSize, pTaskInfo->id.str); - pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo); - if (code != TSDB_CODE_SUCCESS || pInfo->pTableQueryInfo == NULL) { - goto _error; - } - - size_t tableGroup = taosArrayGetSize(pTableGroupInfo->pGroupList); - initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)tableGroup); - - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - pOperator->name = "MultiTableAggregate"; - // pOperator->operatorType = OP_MultiTableAggregate; - pOperator->blockingOptr = true; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->pExpr = pExprInfo; - pOperator->numOfOutput = numOfCols; - pOperator->pTaskInfo = pTaskInfo; - - pOperator->getNextFn = doMultiTableAggregate; - pOperator->closeFn = destroyAggOperatorInfo; - code = appendDownstream(pOperator, &downstream, 1); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - - return pOperator; - -_error: - return NULL; -} - static SArray* setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols) { SArray* pList = taosArrayInit(4, sizeof(int32_t)); for(int32_t i = 0; i < numOfCols; ++i) { @@ -6824,16 +6724,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pOptr = createJoinOperatorInfo(ops, size, pExprInfo, num, pResBlock, pJoinNode->pOnConditions, pTaskInfo); } else { ASSERT(0); - } /*else if (pPhyNode->info.type == OP_MultiTableAggregate) { - size_t size = taosArrayGetSize(pPhyNode->pChildren); - assert(size == 1); - - for (int32_t i = 0; i < size; ++i) { - SPhysiNode* pChildNode = taosArrayGetP(pPhyNode->pChildren, i); - SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); - return createMultiTableAggOperatorInfo(op, pPhyNode->pTargets, pTaskInfo, pTableGroupInfo); - } - }*/ + } taosMemoryFree(ops); return pOptr; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 9e62600d0f4a9252d15dcc75ab2a48a4043729e6..7c04f9485c35126c0a313a5fd28fa18cb8404b70 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -65,6 +65,21 @@ static void setupQueryRangeForReverseScan(STableScanInfo* pTableScanInfo) { #endif } +// relocated the column data according to the slotId +static void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols) { + int32_t numOfCols = pBlock->info.numOfCols; + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* p = taosArrayGet(pCols, i); + SColMatchInfo* pmInfo = taosArrayGet(pColMatchInfo, i); + if (!pmInfo->output) { + continue; + } + + ASSERT(pmInfo->colId == p->info.colId); + taosArraySet(pBlock->pDataBlock, pmInfo->targetSlotId, p); + } +} + int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; STableScanInfo* pInfo = pOperator->info; @@ -110,50 +125,36 @@ int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, } pBlock->pBlockAgg[pColMatchInfo->targetSlotId] = pColAgg[i]; } - } else { - // failed to load the block sma data, data block statistics does not exist, load data block instead - pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL); - pCost->totalCheckedRows += pBlock->info.rows; - pCost->loadBlocks += 1; - } - return TSDB_CODE_SUCCESS; - } - if (*status == FUNC_DATA_REQUIRED_DATA_LOAD) { - // todo filter data block according to the block sma data firstly -#if 0 - if (!doFilterByBlockStatistics(pBlock->pBlockStatis, pTableScanInfo->pCtx, pBlockInfo->rows)) { - pCost->filterOutBlocks += 1; - qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey, - pBlockInfo->window.ekey, pBlockInfo->rows); - (*status) = FUNC_DATA_REQUIRED_FILTEROUT; return TSDB_CODE_SUCCESS; + } else { // failed to load the block sma data, data block statistics does not exist, load data block instead + *status = FUNC_DATA_REQUIRED_DATA_LOAD; } -#endif + } - pCost->totalCheckedRows += pBlock->info.rows; - pCost->loadBlocks += 1; + ASSERT (*status == FUNC_DATA_REQUIRED_DATA_LOAD); - SArray* pCols = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL); - if (pCols == NULL) { - return terrno; - } + // todo filter data block according to the block sma data firstly +#if 0 + if (!doFilterByBlockStatistics(pBlock->pBlockStatis, pTableScanInfo->pCtx, pBlockInfo->rows)) { + pCost->filterOutBlocks += 1; + qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey, + pBlockInfo->window.ekey, pBlockInfo->rows); + (*status) = FUNC_DATA_REQUIRED_FILTEROUT; + return TSDB_CODE_SUCCESS; + } +#endif - // relocated the column data into the correct slotId - int32_t numOfCols = pBlock->info.numOfCols; - for (int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* p = taosArrayGet(pCols, i); - SColMatchInfo* pColMatchInfo = taosArrayGet(pTableScanInfo->pColMatchInfo, i); - if (!pColMatchInfo->output) { - continue; - } + pCost->totalCheckedRows += pBlock->info.rows; + pCost->loadBlocks += 1; - ASSERT(pColMatchInfo->colId == p->info.colId); - taosArraySet(pBlock->pDataBlock, pColMatchInfo->targetSlotId, p); -// taosArraySet(pBlock->pBlockAgg) - } + SArray* pCols = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL); + if (pCols == NULL) { + return terrno; } + relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols); + doFilter(pTableScanInfo->pFilterNode, pBlock); if (pBlock->info.rows == 0) { pCost->filterOutBlocks += 1; diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 72f2ef9cc3ef9c14a50a480af9d839546a382dce..0d7e984ae7c8262a5df4f558b84219902432886d 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -386,19 +386,24 @@ int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) { GET_TYPED_DATA(prev, uint64_t, type, buf); uint64_t val = GET_UINT64_VAL(tval); - UPDATE_DATA(pCtx, prev, val, numOfElems, isMinFunc, key); - } else if (type == TSDB_DATA_TYPE_DOUBLE) { - double prev = 0; - GET_TYPED_DATA(prev, double, type, buf); + if ((prev < val) ^ isMinFunc) { + *(uint64_t*) buf = val; + for (int32_t i = 0; i < (pCtx)->subsidiaryRes.numOfCols; ++i) { + SqlFunctionCtx* __ctx = pCtx->subsidiaryRes.pCtx[i]; + if (__ctx->functionId == FUNCTION_TS_DUMMY) { // TODO refactor + __ctx->tag.i = key; + __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; + } + __ctx->fpSet.process(__ctx); + } + } + } else if (type == TSDB_DATA_TYPE_DOUBLE) { double val = GET_DOUBLE_VAL(tval); - UPDATE_DATA(pCtx, prev, val, numOfElems, isMinFunc, key); + UPDATE_DATA(pCtx, *(double*) buf, val, numOfElems, isMinFunc, key); } else if (type == TSDB_DATA_TYPE_FLOAT) { - float prev = 0; - GET_TYPED_DATA(prev, float, type, buf); - double val = GET_DOUBLE_VAL(tval); - UPDATE_DATA(pCtx, prev, (float)val, numOfElems, isMinFunc, key); + UPDATE_DATA(pCtx, *(float*) buf, val, numOfElems, isMinFunc, key); } return numOfElems;