diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 88ac1532c2ec01d623a761e3f2699c7e3bb5d38c..7d977b0d235b91e41b8d68d4187068ce47e5ac12 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -126,7 +126,7 @@ enum { enum { MAIN_SCAN = 0x0u, - REVERSE_SCAN = 0x1u, + REVERSE_SCAN = 0x1u, // todo remove it REPEAT_SCAN = 0x2u, //repeat scan belongs to the master scan MERGE_STAGE = 0x20u, }; @@ -222,12 +222,12 @@ enum { typedef struct tExprNode { int32_t nodeType; union { - struct { - int32_t optr; // binary operator - void *info; // support filter operation on this expression only available for leaf node - struct tExprNode *pLeft; // left child pointer - struct tExprNode *pRight; // right child pointer - } _node; +// struct { +// int32_t optr; // binary operator +// void *info; // support filter operation on this expression only available for leaf node +// struct tExprNode *pLeft; // left child pointer +// struct tExprNode *pRight; // right child pointer +// } _node; SSchema *pSchema;// column node struct SVariant *pVal; // value node @@ -237,12 +237,6 @@ typedef struct tExprNode { int32_t functionId; int32_t num; struct SFunctionNode *pFunctNode; - // Note that the attribute of pChild is not the parameter of function, it is the columns that involved in the - // calculation instead. - // E.g., Cov(col1, col2), the column information, w.r.t. the col1 and col2, is kept in pChild nodes. - // The concat function, concat(col1, col2), is a binary scalar - // operator and is kept in the attribute of _node. - struct tExprNode **pChild; } _function; struct { @@ -271,9 +265,10 @@ typedef struct SAggFunctionInfo { } SAggFunctionInfo; struct SScalarParam { - SColumnInfoData *columnData; - SHashObj *pHashFilter; - int32_t numOfRows; + SColumnInfoData *columnData; + SHashObj *pHashFilter; + void *param; // other parameter, such as meta handle from vnode, to extract table name/tag value + int32_t numOfRows; }; int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, SResultDataInfo* pInfo, int16_t extLength, @@ -281,10 +276,6 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI bool qIsValidUdf(SArray* pUdfInfo, const char* name, int32_t len, int32_t* functionId); -tExprNode* exprTreeFromBinary(const void* data, size_t size); - -tExprNode* exprdup(tExprNode* pTree); - void resetResultRowEntryResult(SqlFunctionCtx* pCtx, int32_t num); void cleanupResultRowEntry(struct SResultRowEntryInfo* pCell); int32_t getNumOfResult(SqlFunctionCtx* pCtx, int32_t num, SSDataBlock* pResBlock); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 93e81aa70ee2d9d7a3658adc86d4bd3aa55315ad..9decddc843b045287775468cb5095a93b10bc109 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -333,6 +333,8 @@ typedef struct SScanInfo { typedef struct STableScanInfo { void* dataReader; + SReadHandle readHandle; + SFileBlockLoadRecorder readRecorder; int64_t numOfRows; int64_t elapsedTime; @@ -348,6 +350,11 @@ typedef struct STableScanInfo { SArray* pColMatchInfo; int32_t numOfOutput; + SExprInfo* pPseudoExpr; + int32_t numOfPseudoExpr; + SqlFunctionCtx* pPseudoCtx; +// int32_t* rowCellInfoOffset; + SQueryTableDataCond cond; int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan int32_t dataBlockLoadFlag; @@ -628,7 +635,7 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total, SArray* pColList); void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, STimeWindow* win); -int32_t getTableScanOrder(SOperatorInfo* pOperator); +int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag); void doSetOperatorCompleted(SOperatorInfo* pOperator); void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock); @@ -644,12 +651,17 @@ SSDataBlock* loadNextDataBlock(void* param); void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset); +SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols, + int32_t type); +SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs); +SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode); +int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode); + SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData, int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo, bool isIntervalQuery, SAggSupporter* pSup); -SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, SQueryTableDataCond* pCond, int32_t numOfOutput, int32_t dataLoadFlag, const uint8_t* scanInfo, - SArray* pColMatchInfo, SSDataBlock* pResBlock, SNode* pCondition, SInterval* pInterval, double sampleRatio, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, SReadHandle* pHandle, SExecTaskInfo* pTaskInfo); SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); @@ -704,7 +716,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntim int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx, int32_t numOfOutput, SArray* pPseudoList); -void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, bool createDummyCol); +void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol); void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 11ca3c1f0df7a9251287c77c2521751f96275123..78ecae864f8471afe800ed1feebac1d3b98c2740 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -649,7 +649,7 @@ static FORCE_INLINE TSKEY reviseWindowEkey(STaskAttr* pQueryAttr, STimeWindow* p } static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, - bool createDummyCol); + int32_t scanFlag, bool createDummyCol); static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) { @@ -660,12 +660,12 @@ static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pC } } -void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, +void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol) { if (pBlock->pBlockAgg != NULL) { doSetInputDataBlockInfo(pOperator, pCtx, pBlock, order); } else { - doSetInputDataBlock(pOperator, pCtx, pBlock, order, createDummyCol); + doSetInputDataBlock(pOperator, pCtx, pBlock, order, scanFlag, createDummyCol); } } @@ -712,14 +712,14 @@ static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunc } static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, - bool createDummyCol) { + int32_t scanFlag, bool createDummyCol) { int32_t code = TSDB_CODE_SUCCESS; for (int32_t i = 0; i < pOperator->numOfExprs; ++i) { pCtx[i].order = order; pCtx[i].size = pBlock->info.rows; pCtx[i].pSrcBlock = pBlock; - pCtx[i].currentStage = MAIN_SCAN; + pCtx[i].currentStage = scanFlag; SInputColumnInfoData* pInput = &pCtx[i].input; pInput->uid = pBlock->info.uid; @@ -3490,7 +3490,7 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) { break; } - setInputDataBlock(pOperator, pInfo->binfo.pCtx, pDataBlock, TSDB_ORDER_ASC, true); + setInputDataBlock(pOperator, pInfo->binfo.pCtx, pDataBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); // updateOutputBuf(&pInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor, // pOperator->pRuntimeEnv, true); doMergeImpl(pOperator, pOperator->numOfExprs, pDataBlock); @@ -3655,17 +3655,19 @@ _error: return NULL; } -int32_t getTableScanOrder(SOperatorInfo* pOperator) { +int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) { - return TSDB_ORDER_ASC; + return TSDB_CODE_INVALID_PARA; } else { - return getTableScanOrder(pOperator->pDownstream[0]); + return getTableScanInfo(pOperator->pDownstream[0], order, scanFlag); } } STableScanInfo* pTableScanInfo = pOperator->info; - return pTableScanInfo->cond.order; + *order = pTableScanInfo->cond.order; + *scanFlag = pTableScanInfo->scanFlag; + return TSDB_CODE_SUCCESS; } // this is a blocking operator @@ -3681,6 +3683,9 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { SOperatorInfo* downstream = pOperator->pDownstream[0]; + int32_t order = TSDB_ORDER_ASC; + int32_t scanFlag = MAIN_SCAN; + while (1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); @@ -3693,11 +3698,14 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { // setTagValue(pOperator, pAggInfo->current->pTable, pInfo->pCtx, pOperator->numOfExprs); // } - int32_t order = getTableScanOrder(pOperator); + int32_t code = getTableScanInfo(pOperator, &order, &scanFlag); + if (code != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, code); + } // there is an scalar expression that needs to be calculated before apply the group aggregation. if (pAggInfo->pScalarExprInfo != NULL) { - int32_t code = projectApplyFunctions(pAggInfo->pScalarExprInfo, pBlock, pBlock, pAggInfo->pScalarCtx, + code = projectApplyFunctions(pAggInfo->pScalarExprInfo, pBlock, pBlock, pAggInfo->pScalarCtx, pAggInfo->numOfScalarExpr, NULL); if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code; @@ -3707,7 +3715,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { // the pDataBlock are always the same one, no need to call this again setExecutionContext(pOperator->numOfExprs, pBlock->info.groupId, pTaskInfo, pAggInfo); - setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, true); + setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, scanFlag, true); doAggregateImpl(pOperator, 0, pInfo->pCtx); #if 0 // test for encode/decode result info @@ -3988,6 +3996,9 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { } #endif + int32_t order = 0; + int32_t scanFlag = 0; + SOperatorInfo* downstream = pOperator->pDownstream[0]; while (1) { @@ -4019,15 +4030,14 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { // } // the pDataBlock are always the same one, no need to call this again - int32_t order = getTableScanOrder(pOperator->pDownstream[0]); + int32_t code = getTableScanInfo(pOperator->pDownstream[0], &order, &scanFlag); - setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, false); + setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, scanFlag, false); blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows); - pTaskInfo->code = projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfExprs, - pProjectInfo->pPseudoColInfo); - if (pTaskInfo->code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, pTaskInfo->code); + code = projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfExprs, pProjectInfo->pPseudoColInfo); + if (code != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, code); } int32_t status = handleLimitOffset(pOperator, pBlock); @@ -4626,8 +4636,22 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* pExp->pExpr->_function.functionId = pFuncNode->funcId; pExp->pExpr->_function.pFunctNode = pFuncNode; + strncpy(pExp->pExpr->_function.functionName, pFuncNode->functionName, tListLen(pExp->pExpr->_function.functionName)); +#if 1 + // todo refactor: add the parameter for tbname function + if (strcmp(pExp->pExpr->_function.functionName, "tbname") == 0) { + pFuncNode->pParameterList = nodesMakeList(); + ASSERT(LIST_LENGTH(pFuncNode->pParameterList) == 0); + SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE); + if (NULL == res) { // todo handle error + } else { + res->node.resType = (SDataType) {.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT}; + nodesListAppend(pFuncNode->pParameterList, res); + } + } +#endif int32_t numOfParam = LIST_LENGTH(pFuncNode->pParameterList); @@ -4688,58 +4712,29 @@ static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t uint64_t queryId, uint64_t taskId); static SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo); static SArray* extractColumnInfo(SNodeList* pNodeList); -static SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols, - int32_t type); static SArray* createSortInfo(SNodeList* pNodeList); static SArray* extractPartitionColInfo(SNodeList* pNodeList); -static int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode); static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode); -static SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode) { - SInterval interval = { - .interval = pTableScanNode->interval, - .sliding = pTableScanNode->sliding, - .intervalUnit = pTableScanNode->intervalUnit, - .slidingUnit = pTableScanNode->slidingUnit, - .offset = pTableScanNode->offset, - }; - - return interval; -} - SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) { int32_t type = nodeType(pPhyNode); if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) { if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) { - SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; - int32_t numOfCols = 0; tsdbReaderT pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId); if (pDataReader == NULL && terrno != 0) { return NULL; } - SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc; - - SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID); - SSDataBlock* pResBlock = createResDataBlock(pDescNode); - - SQueryTableDataCond cond = {0}; - int32_t code = initQueryTableDataCond(&cond, pTableScanNode); - if (code != TSDB_CODE_SUCCESS) { - return NULL; - } + SOperatorInfo* pOperator = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo); - SInterval interval = extractIntervalInfo(pTableScanNode); - SOperatorInfo* pOperator = createTableScanOperatorInfo( - pDataReader, &cond, numOfCols, pTableScanNode->dataRequired, pTableScanNode->scanSeq, pColList, pResBlock, - pScanPhyNode->node.pConditions, &interval, pTableScanNode->ratio, pTaskInfo); STableScanInfo* pScanInfo = pOperator->info; pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder; + return pOperator; } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) { SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode; @@ -4929,7 +4924,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return pOptr; } -static int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode) { +int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode) { pCond->loadExternalRows = false; pCond->order = pTableScanNode->scanSeq[0] > 0 ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index e3a507bf7cdc7e035039042e6878e004d525b2d5..daf3f5bae8c312f9b6967d0ba83359fb950ebc5b 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -287,7 +287,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { } // the pDataBlock are always the same one, no need to call this again - setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, true); + setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, MAIN_SCAN, true); // there is an scalar expression that needs to be calculated right before apply the group aggregation. if (pInfo->pScalarExprInfo != NULL) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index eaacb561d50a64ad7d9627692ffcbb86357594aa..b3d915ab93b63fe3fc1831a43416caa737bf14cc 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -13,7 +13,6 @@ * along with this program. If not, see . */ -#include #include "filter.h" #include "function.h" #include "functionMgt.h" @@ -284,6 +283,27 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { continue; } + // currently only the tbname pseudo column + if (pTableScanInfo->numOfPseudoExpr > 0) { + int32_t dstSlotId = pTableScanInfo->pPseudoExpr->base.resSchema.slotId; + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId); + colInfoDataEnsureCapacity(pColInfoData, 0, pBlock->info.rows); + + struct SScalarFuncExecFuncs fpSet; + fmGetScalarFuncExecFuncs(pTableScanInfo->pPseudoExpr->pExpr->_function.functionId, &fpSet); + + SColumnInfoData infoData = {0}; + infoData.info.type = TSDB_DATA_TYPE_BIGINT; + infoData.info.bytes = sizeof(uint64_t); + colInfoDataEnsureCapacity(&infoData, 0, 1); + + colDataAppendInt64(&infoData, 0, &pBlock->info.uid); + SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .param = pTableScanInfo->readHandle.meta, .columnData = &infoData}; + + SScalarParam param = {.columnData = pColInfoData}; + fpSet.process(&srcParam, 1, ¶m); + } + return pBlock; } @@ -314,8 +334,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { STimeWindow* pWin = &pTableScanInfo->cond.twindow; qDebug("%s start to repeat ascending order scan data blocks due to query func required, qrange:%" PRId64 - "-%" PRId64, - GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey); + "-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey); // do prepare for the next round table scan operation tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond); @@ -359,10 +378,19 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { return NULL; } -SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, SQueryTableDataCond* pCond, int32_t numOfOutput, - int32_t dataLoadFlag, const uint8_t* scanInfo, SArray* pColMatchInfo, - SSDataBlock* pResBlock, SNode* pCondition, SInterval* pInterval, - double sampleRatio, SExecTaskInfo* pTaskInfo) { +SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode) { + SInterval interval = { + .interval = pTableScanNode->interval, + .sliding = pTableScanNode->sliding, + .intervalUnit = pTableScanNode->intervalUnit, + .slidingUnit = pTableScanNode->slidingUnit, + .offset = pTableScanNode->offset, + }; + + return interval; +} + +SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) { STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -373,25 +401,40 @@ SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, SQueryTableDataCon return NULL; } - pInfo->cond = *pCond; - pInfo->scanInfo = (SScanInfo){.numOfAsc = scanInfo[0], .numOfDesc = scanInfo[1]}; + SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc; - pInfo->interval = *pInterval; - pInfo->sampleRatio = sampleRatio; - pInfo->dataBlockLoadFlag = dataLoadFlag; - pInfo->pResBlock = pResBlock; - pInfo->pFilterNode = pCondition; - pInfo->dataReader = pDataReader; - pInfo->scanFlag = MAIN_SCAN; - pInfo->pColMatchInfo = pColMatchInfo; + int32_t numOfCols = 0; + SArray* pColList = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID); - pOperator->name = "TableScanOperator"; // for dubug purpose + int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode); + if (code != TSDB_CODE_SUCCESS) { + return NULL; + } + + if (pTableScanNode->scan.pScanPseudoCols != NULL) { + pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr); + pInfo->pPseudoCtx = createSqlFunctionCtx(pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, &pInfo->rowCellInfoOffset); + } + + pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; + + pInfo->readHandle = *readHandle; + pInfo->interval = extractIntervalInfo(pTableScanNode); + pInfo->sampleRatio = pTableScanNode->ratio; + pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired; + pInfo->pResBlock = createResDataBlock(pDescNode); + pInfo->pFilterNode = pTableScanNode->scan.node.pConditions; + pInfo->dataReader = pDataReader; + pInfo->scanFlag = MAIN_SCAN; + pInfo->pColMatchInfo = pColList; + + pOperator->name = "TableScanOperator"; // for debug purpose pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->numOfExprs = numOfOutput; - pOperator->pTaskInfo = pTaskInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->numOfExprs = numOfCols; + pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, NULL, NULL, NULL, NULL); @@ -1311,7 +1354,6 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { metaReaderClear(&mr); colDataAppend(pDst, count, str, false); - // data = tsdbGetTableTagVal(item->pTable, pExprInfo[j].base.pColumns->info.colId, type, bytes); // dst = pColInfo->pData + count * pExprInfo[j].base.resSchema.bytes; // doSetTagValueToResultBuf(dst, data, type, bytes); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 738f4821bde2dd153a4b38457e31bab5977510ba..4cd4112f1006a00205354bab89222c4eb7ed9292 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -775,7 +775,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfExprs); // the pDataBlock are always the same one, no need to call this again - setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, true); + setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, MAIN_SCAN, true); STableQueryInfo* pTableQueryInfo = pInfo->pCurrent; setIntervalQueryRange(pTableQueryInfo, pBlock->info.window.skey, &pTaskInfo->window); @@ -910,7 +910,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { break; } - setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, order, true); + setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, order, MAIN_SCAN, true); doStateWindowAggImpl(pOperator, pInfo, pBlock); } @@ -1024,7 +1024,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfExprs); // the pDataBlock are always the same one, no need to call this again - setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, true); + setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, MAIN_SCAN, true); if (pInfo->invertible) { setInverFunction(pInfo->binfo.pCtx, pOperator->numOfExprs, pBlock->info.type); } @@ -1286,7 +1286,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { } // the pDataBlock are always the same one, no need to call this again - setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, order, true); + setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, order, MAIN_SCAN, true); doSessionWindowAggImpl(pOperator, pInfo, pBlock); } @@ -1334,7 +1334,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator) { // setTagValue(pOperator, pRuntimeEnv->current->pTable, pIntervalInfo->pCtx, pOperator->numOfExprs); // the pDataBlock are always the same one, no need to call this again - setInputDataBlock(pOperator, pSliceInfo->binfo.pCtx, pBlock, order, true); + setInputDataBlock(pOperator, pSliceInfo->binfo.pCtx, pBlock, order, MAIN_SCAN, true); // hashAllIntervalAgg(pOperator, &pSliceInfo->binfo.resultRowInfo, pBlock, 0); } diff --git a/source/libs/scalar/CMakeLists.txt b/source/libs/scalar/CMakeLists.txt index 02d530533ccdb551c6c95d13dfa9ba34a41940b4..87f4bb9c646d4ec592b7252da5693b565bb04109 100644 --- a/source/libs/scalar/CMakeLists.txt +++ b/source/libs/scalar/CMakeLists.txt @@ -8,7 +8,7 @@ target_include_directories( ) target_link_libraries(scalar - PRIVATE os util common nodes function qcom + PRIVATE os util common nodes function qcom vnode ) if(${BUILD_TEST}) diff --git a/source/libs/scalar/inc/sclInt.h b/source/libs/scalar/inc/sclInt.h index 659d7dcf7e3b3a33d5b741e184da80f7c953c5ec..53f71bc08051c7cb6009f97283ae55e8da78944d 100644 --- a/source/libs/scalar/inc/sclInt.h +++ b/source/libs/scalar/inc/sclInt.h @@ -26,6 +26,7 @@ typedef struct SScalarCtx { int32_t code; SArray *pBlockList; /* element is SSDataBlock* */ SHashObj *pRes; /* element is SScalarParam */ + void *param; // additional parameter (meta actually) for acquire value such as tbname/tags values } SScalarCtx; diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index 5231890821d242bc8da03d87965752f76d2ba872..ccc6a23d29092c99fde540ed54840b4a168a0a11 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -250,6 +250,7 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t *rowNum = param->numOfRows; } + param->param = ctx->param; return TSDB_CODE_SUCCESS; } @@ -884,7 +885,7 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) { } int32_t code = 0; - SScalarCtx ctx = {.code = 0, .pBlockList = pBlockList}; + SScalarCtx ctx = {.code = 0, .pBlockList = pBlockList, .param = pDst->param}; // TODO: OPT performance ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 0161323e37722475d64cdf2a4e4c6605eed82473..d4a88622e2db99db1d3b7ba75d2b6705d6ca14e7 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -1,10 +1,11 @@ #include "function.h" #include "scalar.h" -#include "tdatablock.h" -#include "ttime.h" #include "sclInt.h" #include "sclvector.h" +#include "tdatablock.h" #include "tjson.h" +#include "ttime.h" +#include "vnode.h" typedef float (*_float_fn)(float); typedef double (*_double_fn)(double); @@ -1512,6 +1513,21 @@ int32_t winEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p int32_t qTbnameFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { ASSERT(inputNum == 1); - colDataAppend(pOutput->columnData, pOutput->numOfRows, colDataGetData(pInput->columnData, 0), false); + + SMetaReader mr = {0}; + metaReaderInit(&mr, pInput->param, 0); + + uint64_t uid = *(uint64_t *)colDataGetData(pInput->columnData, 0); + metaGetTableEntryByUid(&mr, uid); + + char str[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_TO_VARSTR(str, mr.me.name); + metaReaderClear(&mr); + + for(int32_t i = 0; i < pInput->numOfRows; ++i) { + colDataAppend(pOutput->columnData, pOutput->numOfRows + i, str, false); + } + + pOutput->numOfRows += pInput->numOfRows; return TSDB_CODE_SUCCESS; }