未验证 提交 4c3f22f2 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #21975 from taosdata/enh/TD-24745

enh: add procedures for udf scalar function in nested queries where outer query is constant table
...@@ -38,7 +38,7 @@ typedef struct SIndefOperatorInfo { ...@@ -38,7 +38,7 @@ typedef struct SIndefOperatorInfo {
SSDataBlock* pNextGroupRes; SSDataBlock* pNextGroupRes;
} SIndefOperatorInfo; } SIndefOperatorInfo;
static SSDataBlock* doGenerateSourceData(SOperatorInfo* pOperator); static int32_t doGenerateSourceData(SOperatorInfo* pOperator);
static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator); static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator);
static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator); static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator);
static SArray* setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols); static SArray* setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols);
...@@ -215,7 +215,7 @@ static int32_t setInfoForNewGroup(SSDataBlock* pBlock, SLimitInfo* pLimitInfo, S ...@@ -215,7 +215,7 @@ static int32_t setInfoForNewGroup(SSDataBlock* pBlock, SLimitInfo* pLimitInfo, S
if (newGroup) { if (newGroup) {
resetLimitInfoForNextGroup(pLimitInfo); resetLimitInfoForNextGroup(pLimitInfo);
} }
return PROJECT_RETRIEVE_CONTINUE; return PROJECT_RETRIEVE_CONTINUE;
} }
...@@ -267,7 +267,12 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { ...@@ -267,7 +267,12 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
SLimitInfo* pLimitInfo = &pProjectInfo->limitInfo; SLimitInfo* pLimitInfo = &pProjectInfo->limitInfo;
if (downstream == NULL) { if (downstream == NULL) {
return doGenerateSourceData(pOperator); code = doGenerateSourceData(pOperator);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
return (pRes->info.rows > 0) ? pRes : NULL;
} }
while (1) { while (1) {
...@@ -616,7 +621,7 @@ SArray* setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols) { ...@@ -616,7 +621,7 @@ SArray* setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols) {
return pList; return pList;
} }
SSDataBlock* doGenerateSourceData(SOperatorInfo* pOperator) { int32_t doGenerateSourceData(SOperatorInfo* pOperator) {
SProjectOperatorInfo* pProjectInfo = pOperator->info; SProjectOperatorInfo* pProjectInfo = pOperator->info;
SExprSupp* pSup = &pOperator->exprSupp; SExprSupp* pSup = &pOperator->exprSupp;
...@@ -630,14 +635,45 @@ SSDataBlock* doGenerateSourceData(SOperatorInfo* pOperator) { ...@@ -630,14 +635,45 @@ SSDataBlock* doGenerateSourceData(SOperatorInfo* pOperator) {
for (int32_t k = 0; k < pSup->numOfExprs; ++k) { for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
int32_t outputSlotId = pExpr[k].base.resSchema.slotId; int32_t outputSlotId = pExpr[k].base.resSchema.slotId;
ASSERT(pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE); if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) {
SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, outputSlotId); SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, outputSlotId);
int32_t type = pExpr[k].base.pParam[0].param.nType;
if (TSDB_DATA_TYPE_NULL == type) {
colDataSetNNULL(pColInfoData, 0, 1);
} else {
colDataSetVal(pColInfoData, 0, taosVariantGet(&pExpr[k].base.pParam[0].param, type), false);
}
} else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) {
SqlFunctionCtx* pfCtx = &pSup->pCtx[k];
// UDF scalar functions will be calculated here, for example, select foo(n) from (select 1 n).
// UDF aggregate functions will be handled in agg operator.
if (fmIsScalarFunc(pfCtx->functionId)) {
SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
taosArrayPush(pBlockList, &pRes);
int32_t type = pExpr[k].base.pParam[0].param.nType; SColumnInfoData* pResColData = taosArrayGet(pRes->pDataBlock, outputSlotId);
if (TSDB_DATA_TYPE_NULL == type) { SColumnInfoData idata = {.info = pResColData->info, .hasNull = true};
colDataSetNNULL(pColInfoData, 0, 1);
SScalarParam dest = {.columnData = &idata};
int32_t code = scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(pBlockList);
return code;
}
int32_t startOffset = pRes->info.rows;
ASSERT(pRes->info.capacity > 0);
colDataAssign(pResColData, &idata, dest.numOfRows, &pRes->info);
colDataDestroy(&idata);
taosArrayDestroy(pBlockList);
} else {
return TSDB_CODE_OPS_NOT_SUPPORT;
}
} else { } else {
colDataSetVal(pColInfoData, 0, taosVariantGet(&pExpr[k].base.pParam[0].param, type), false); return TSDB_CODE_OPS_NOT_SUPPORT;
} }
} }
...@@ -653,7 +689,7 @@ SSDataBlock* doGenerateSourceData(SOperatorInfo* pOperator) { ...@@ -653,7 +689,7 @@ SSDataBlock* doGenerateSourceData(SOperatorInfo* pOperator) {
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
} }
return (pRes->info.rows > 0) ? pRes : NULL; return TSDB_CODE_SUCCESS;
} }
static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, SArray* pPseudoList) { static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, SArray* pPseudoList) {
......
...@@ -1694,7 +1694,8 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) { ...@@ -1694,7 +1694,8 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) {
SCL_ERR_JRET(TSDB_CODE_APP_ERROR); SCL_ERR_JRET(TSDB_CODE_APP_ERROR);
} }
if (1 == res->numOfRows) { SSDataBlock *pb = taosArrayGetP(pBlockList, 0);
if (1 == res->numOfRows && pb->info.rows > 0) {
SCL_ERR_JRET(sclExtendResRows(pDst, res, pBlockList)); SCL_ERR_JRET(sclExtendResRows(pDst, res, pBlockList));
} else { } else {
colInfoDataEnsureCapacity(pDst->columnData, res->numOfRows, true); colInfoDataEnsureCapacity(pDst->columnData, res->numOfRows, true);
......
...@@ -234,6 +234,11 @@ class TDTestCase: ...@@ -234,6 +234,11 @@ class TDTestCase:
tdSql.checkData(20,6,88) tdSql.checkData(20,6,88)
tdSql.checkData(20,7,1) tdSql.checkData(20,7,1)
tdSql.query("select udf1(1) from (select 1)")
tdSql.checkData(0,0,1)
tdSql.query("select udf1(n) from (select 1 n)")
tdSql.checkData(0,0,1)
# aggregate functions # aggregate functions
tdSql.query("select udf2(num1) ,udf2(num2), udf2(num3) from tb") tdSql.query("select udf2(num1) ,udf2(num2), udf2(num3) from tb")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册