提交 c6686a0a 编写于 作者: D dapan1121

fix regression caused by code refactoring

上级 fdfb7b58
......@@ -139,11 +139,11 @@ bool isSimpleAggregate(SQueryInfo* pQueryInfo);
bool isBlockDistQuery(SQueryInfo* pQueryInfo);
int32_t tscGetTopbotQueryParam(SQueryInfo* pQueryInfo);
bool tscNonOrderedProjectionQueryOnSTable(SSqlCmd *pCmd, SQueryInfo *pQueryInfo, int32_t tableIndex);
bool tscOrderedProjectionQueryOnSTable(SSqlCmd *pCmd, SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscIsProjectionQueryOnSTable(SSqlCmd *pCmd, SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo *pQueryInfo, int32_t tableIndex);
bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscIsProjectionQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo);
bool tscIsProjectionQuery(SQueryInfo* pQueryInfo);
bool tscIsTwoStageSTableQuery(SSqlCmd *pCmd, SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscQueryTags(SQueryInfo* pQueryInfo);
......@@ -246,7 +246,7 @@ void tscVgroupTableCopy(SVgroupTableInfo* info, SVgroupTableInfo* pInfo);
int tscGetSTableVgroupInfo(SSqlObj* pSql, int32_t clauseIndex);
int tscGetTableMeta(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo);
int tscGetTableMetaEx(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, bool createIfNotExists);
int32_t tscGetUdfFromNode(SSqlObj *pSql);
int32_t tscGetUdfFromNode(SSqlObj *pSql, SQueryInfo* pQueryInfo);
void tscResetForNextRetrieve(SSqlRes* pRes);
void tscDoQuery(SSqlObj* pSql);
......
......@@ -240,6 +240,7 @@ typedef struct SQueryInfo {
int32_t havingFieldNum;
bool globalMerge; // need global merge
bool arithmCalOnAgg; // arithmetic calculation on aggregate result.
SArray *pUdfInfo; // user defined function information SArray<SUdfInfo>
} SQueryInfo;
typedef struct {
......@@ -284,7 +285,6 @@ typedef struct {
SHashObj *pTableBlockHashList; // data block for each table
SArray *pDataBlocks; // SArray<STableDataBlocks*>. Merged submit block for each vgroup
SArray *pUdfInfo; // user defined function information SArray<SUdfInfo>
} SSqlCmd;
typedef struct SResRec {
......
......@@ -306,7 +306,7 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde
pMerger->pDesc->pColumnModel->capacity = 1;
// restore the limitation value at the last stage
if (tscOrderedProjectionQueryOnSTable(pCmd, pQueryInfo, 0)) {
if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
pQueryInfo->limit.limit = pQueryInfo->clauseLimit;
pQueryInfo->limit.offset = pQueryInfo->prjOffset;
}
......@@ -461,7 +461,7 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCm
}
// primary timestamp column is involved in final result
if (pQueryInfo->interval.interval != 0 || tscOrderedProjectionQueryOnSTable(pCmd, pQueryInfo, 0)) {
if (pQueryInfo->interval.interval != 0 || tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
numOfGroupByCols++;
}
......@@ -617,7 +617,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
type = pModel->pFields[i].field.type;
bytes = pModel->pFields[i].field.bytes;
} else if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pCmd->pUdfInfo, -1 * functionId - 1);
SUdfInfo* pUdfInfo = taosArrayGet(pQueryInfo->pUdfInfo, -1 * functionId - 1);
int32_t ret = getResultDataInfo(p1.type, p1.bytes, functionId, 0, &type, &bytes, &inter, 0, false, pUdfInfo);
assert(ret == TSDB_CODE_SUCCESS);
......@@ -801,6 +801,15 @@ static void doExecuteFinalMergeRv(SOperatorInfo* pOperator, int32_t numOfExpr, S
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
continue;
}
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE);
continue;
}
aAggs[functionId].mergeFunc(&pCtx[j]);
}
} else {
......@@ -809,6 +818,15 @@ static void doExecuteFinalMergeRv(SOperatorInfo* pOperator, int32_t numOfExpr, S
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
continue;
}
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE);
continue;
}
aAggs[functionId].xFinalize(&pCtx[j]);
}
......@@ -825,6 +843,10 @@ static void doExecuteFinalMergeRv(SOperatorInfo* pOperator, int32_t numOfExpr, S
}
for(int32_t j = 0; j < numOfExpr; ++j) {
if (pCtx[j].functionId < 0) {
continue;
}
aAggs[pCtx[j].functionId].init(&pCtx[j], pCtx[j].resultInfo);
}
......@@ -837,6 +859,15 @@ static void doExecuteFinalMergeRv(SOperatorInfo* pOperator, int32_t numOfExpr, S
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
continue;
}
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE);
continue;
}
aAggs[functionId].mergeFunc(&pCtx[j]);
}
}
......@@ -850,6 +881,15 @@ static void doExecuteFinalMergeRv(SOperatorInfo* pOperator, int32_t numOfExpr, S
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
continue;
}
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE);
continue;
}
aAggs[functionId].mergeFunc(&pCtx[j]);
}
}
......@@ -1104,6 +1144,11 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
{ // reset output buffer
for(int32_t j = 0; j < pOperator->numOfOutput; ++j) {
SQLFunctionCtx* pCtx = &pAggInfo->binfo.pCtx[j];
if (pCtx->functionId < 0) {
clearOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity);
continue;
}
aAggs[pCtx->functionId].init(pCtx, pCtx->resultInfo);
}
}
......@@ -1155,6 +1200,14 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
continue;
}
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pAggInfo->udfInfo, -1 * functionId - 1);
doInvokeUdf(pUdfInfo, &pAggInfo->binfo.pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE);
continue;
}
aAggs[functionId].xFinalize(&pAggInfo->binfo.pCtx[j]);
}
......
......@@ -83,7 +83,7 @@ static int32_t validateSelectNodeList(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SAr
static bool validateIpAddress(const char* ip, size_t size);
static bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo);
static bool functionCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, bool joinQuery, bool twQuery);
static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery, bool twQuery);
static int32_t validateGroupbyNode(SQueryInfo* pQueryInfo, SArray* pList, SSqlCmd* pCmd);
......@@ -1774,13 +1774,12 @@ void genUdfList(SArray* pUdfInfo, tSqlExpr *pNode) {
}
}
static int32_t checkForUdf(SSqlObj* pSql, SArray* pSelection) {
SSqlCmd* pCmd = &pSql->cmd;
if (pCmd->pUdfInfo != NULL) {
static int32_t checkForUdf(SSqlObj* pSql, SQueryInfo* pQueryInfo, SArray* pSelection) {
if (pQueryInfo->pUdfInfo != NULL) {
return TSDB_CODE_SUCCESS;
}
pCmd->pUdfInfo = taosArrayInit(4, sizeof(struct SUdfInfo));
pQueryInfo->pUdfInfo = taosArrayInit(4, sizeof(struct SUdfInfo));
size_t nExpr = taosArrayGetSize(pSelection);
......@@ -1789,12 +1788,12 @@ static int32_t checkForUdf(SSqlObj* pSql, SArray* pSelection) {
int32_t type = pItem->pNode->type;
if (type == SQL_NODE_EXPR || type == SQL_NODE_SQLFUNCTION) {
genUdfList(pCmd->pUdfInfo, pItem->pNode);
genUdfList(pQueryInfo->pUdfInfo, pItem->pNode);
}
}
if (taosArrayGetSize(pCmd->pUdfInfo) > 0) {
return tscGetUdfFromNode(pSql);
if (taosArrayGetSize(pQueryInfo->pUdfInfo) > 0) {
return tscGetUdfFromNode(pSql, pQueryInfo);
} else {
return TSDB_CODE_SUCCESS;
}
......@@ -1845,7 +1844,7 @@ int32_t validateSelectNodeList(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SArray* pS
if (type == SQL_NODE_SQLFUNCTION) {
pItem->pNode->functionId = isValidFunction(pItem->pNode->operand.z, pItem->pNode->operand.n);
if (pItem->pNode->functionId < 0) {
SUdfInfo* pUdfInfo = isValidUdf(pCmd->pUdfInfo, pItem->pNode->operand.z, pItem->pNode->operand.n);
SUdfInfo* pUdfInfo = isValidUdf(pQueryInfo->pUdfInfo, pItem->pNode->operand.z, pItem->pNode->operand.n);
if (pUdfInfo == NULL) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5);
}
......@@ -1890,7 +1889,7 @@ int32_t validateSelectNodeList(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SArray* pS
addPrimaryTsColIntoResult(pQueryInfo);
}
if (!functionCompatibleCheck(pCmd, pQueryInfo, joinQuery, timeWindowQuery)) {
if (!functionCompatibleCheck(pQueryInfo, joinQuery, timeWindowQuery)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
......@@ -2700,7 +2699,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
}
default: {
SUdfInfo* pUdfInfo = isValidUdf(pCmd->pUdfInfo, pItem->pNode->operand.z, pItem->pNode->operand.n);
SUdfInfo* pUdfInfo = isValidUdf(pQueryInfo->pUdfInfo, pItem->pNode->operand.z, pItem->pNode->operand.n);
if (pUdfInfo == NULL) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg9);
}
......@@ -3180,7 +3179,7 @@ static bool groupbyTagsOrNull(SQueryInfo* pQueryInfo) {
return true;
}
static bool functionCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, bool joinQuery, bool twQuery) {
static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery, bool twQuery) {
int32_t startIdx = 0;
int32_t aggUdf = 0;
int32_t scalarUdf = 0;
......@@ -3201,7 +3200,7 @@ static bool functionCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, bool
int16_t functionId = pExpr1->base.functionId;
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pCmd->pUdfInfo, -1 * functionId - 1);
SUdfInfo* pUdfInfo = taosArrayGet(pQueryInfo->pUdfInfo, -1 * functionId - 1);
pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE ? ++aggUdf : ++scalarUdf;
continue;
......@@ -5257,7 +5256,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX;
// orderby ts query on super table
if (tscOrderedProjectionQueryOnSTable(pCmd, pQueryInfo, 0)) {
if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
addPrimaryTsColIntoResult(pQueryInfo);
}
}
......@@ -5635,7 +5634,7 @@ int32_t validateFunctionsInIntervalOrGroupbyQuery(SSqlCmd* pCmd, SQueryInfo* pQu
SExprInfo* pExpr = tscSqlExprGet(pQueryInfo, k);
if (pExpr->base.functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pCmd->pUdfInfo, -1 * pExpr->base.functionId - 1);
SUdfInfo* pUdfInfo = taosArrayGet(pQueryInfo->pUdfInfo, -1 * pExpr->base.functionId - 1);
if (pUdfInfo->funcType == TSDB_UDF_TYPE_SCALAR) {
isProjectionFunction = true;
break;
......@@ -5885,13 +5884,13 @@ int32_t validateLimitNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseI
// todo refactor
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
if (!tscQueryTags(pQueryInfo)) { // local handle the super table tag query
if (tscIsProjectionQueryOnSTable(pCmd, pQueryInfo, 0)) {
if (tscIsProjectionQueryOnSTable(pQueryInfo, 0)) {
if (pQueryInfo->slimit.limit > 0 || pQueryInfo->slimit.offset > 0) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
// for projection query on super table, all queries are subqueries
if (tscNonOrderedProjectionQueryOnSTable(pCmd, pQueryInfo, 0) &&
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) &&
!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_QUERY)) {
pQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY;
}
......@@ -5927,7 +5926,7 @@ int32_t validateLimitNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseI
pQueryInfo->prjOffset = pQueryInfo->limit.offset;
pQueryInfo->vgroupLimit = -1;
if (tscOrderedProjectionQueryOnSTable(pCmd, pQueryInfo, 0)) {
if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
/*
* the offset value should be removed during retrieve data from virtual node, since the
* global order are done in client side, so the offset is applied at the client side
......@@ -6280,7 +6279,7 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, SSqlCmd* pCmd)
}
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pCmd->pUdfInfo, -1 * functionId - 1);
SUdfInfo* pUdfInfo = taosArrayGet(pQueryInfo->pUdfInfo, -1 * functionId - 1);
if (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE) {
++numOfAggregation;
}
......@@ -6526,7 +6525,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
}
// projection query on super table does not compatible with "group by" syntax
if (tscIsProjectionQuery(pCmd, pQueryInfo)) {
if (tscIsProjectionQuery(pQueryInfo)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
......@@ -6708,7 +6707,7 @@ void tscPrintSelNodeList(SSqlObj* pSql, int32_t subClauseIndex) {
char *name = NULL;
if (pExpr->base.functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pSql->cmd.pUdfInfo, -1 * pExpr->base.functionId - 1);
SUdfInfo* pUdfInfo = taosArrayGet(pQueryInfo->pUdfInfo, -1 * pExpr->base.functionId - 1);
name = pUdfInfo->name;
} else {
name = aAggs[pExpr->base.functionId].name;
......@@ -7034,7 +7033,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
return code;
}
code = checkForUdf(pSql, pSqlNode->pSelNodeList);
code = checkForUdf(pSql, pQueryInfo, pSqlNode->pSelNodeList);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -7059,7 +7058,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
if (!tscIsProjectionQuery(pCmd, pQueryInfo) && pQueryInfo->interval.interval == 0) {
if (!tscIsProjectionQuery(pQueryInfo) && pQueryInfo->interval.interval == 0) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg7);
}
......@@ -7399,7 +7398,7 @@ int32_t validateHavingClause(SQueryInfo* pQueryInfo, tSqlExpr* pExpr, SSqlCmd* p
}
//REDO function check
if (!functionCompatibleCheck(pCmd, pQueryInfo, joinQuery, timeWindowQuery)) {
if (!functionCompatibleCheck(pQueryInfo, joinQuery, timeWindowQuery)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
......@@ -7589,18 +7588,18 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, int32_t index) {
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TABLE_QUERY);
}
code = checkForUdf(pSql, pQueryInfo, pSqlNode->pSelNodeList);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// parse the group by clause in the first place
if (validateGroupbyNode(pQueryInfo, pSqlNode->pGroupby, pCmd) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
}
code = checkForUdf(pSql, pSqlNode->pSelNodeList);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// set where info
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
// set where info
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
if (pSqlNode->pWhere != NULL) {
if (validateWhereNode(pQueryInfo, &pSqlNode->pWhere, pSql) != TSDB_CODE_SUCCESS) {
......
......@@ -521,7 +521,7 @@ int tscBuildAndSendRequest(SSqlObj *pSql, SQueryInfo* pQueryInfo) {
type = pQueryInfo->type;
// while numOfTables equals to 0, it must be Heartbeat
assert((pQueryInfo->numOfTables == 0 && pQueryInfo->command == TSDB_SQL_HB) || pQueryInfo->numOfTables > 0);
assert((pQueryInfo->numOfTables == 0 && (pQueryInfo->command == TSDB_SQL_HB || pSql->cmd.command == TSDB_SQL_RETRIEVE_FUNC)) || pQueryInfo->numOfTables > 0);
}
tscDebug("0x%"PRIx64" SQL cmd:%s will be processed, name:%s, type:%d", pSql->self, sqlCmd[pCmd->command], name, type);
......@@ -1026,10 +1026,10 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
// support only one udf
if (pCmd->pUdfInfo != NULL && taosArrayGetSize(pCmd->pUdfInfo) > 0) {
if (pQueryInfo->pUdfInfo != NULL && taosArrayGetSize(pQueryInfo->pUdfInfo) > 0) {
pQueryMsg->udfContentOffset = htonl((int32_t) (pMsg - pCmd->payload));
for(int32_t i = 0; i < taosArrayGetSize(pCmd->pUdfInfo); ++i) {
SUdfInfo* pUdfInfo = taosArrayGet(pCmd->pUdfInfo, i);
for(int32_t i = 0; i < taosArrayGetSize(pQueryInfo->pUdfInfo); ++i) {
SUdfInfo* pUdfInfo = taosArrayGet(pQueryInfo->pUdfInfo, i);
*(int8_t*) pMsg = pUdfInfo->resType;
pMsg += sizeof(pUdfInfo->resType);
......@@ -1843,14 +1843,15 @@ int tscBuildRetrieveFuncMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd;
char *pMsg = pCmd->payload;
int32_t numOfFuncs = (int32_t)taosArrayGetSize(pCmd->pUdfInfo);
SQueryInfo* pQueryInfo = tscGetActiveQueryInfo(pCmd);
int32_t numOfFuncs = (int32_t)taosArrayGetSize(pQueryInfo->pUdfInfo);
SRetrieveFuncMsg *pRetrieveFuncMsg = (SRetrieveFuncMsg *)pMsg;
pRetrieveFuncMsg->num = htonl(numOfFuncs);
pMsg += sizeof(SRetrieveFuncMsg);
for(int32_t i = 0; i < numOfFuncs; ++i) {
SUdfInfo* pUdf = taosArrayGet(pCmd->pUdfInfo, i);
SUdfInfo* pUdf = taosArrayGet(pQueryInfo->pUdfInfo, i);
STR_TO_NET_VARSTR(pMsg, pUdf->name);
pMsg += varDataNetTLen(pMsg);
}
......@@ -2125,15 +2126,17 @@ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
int tscProcessRetrieveFuncRsp(SSqlObj* pSql) {
SSqlCmd* pCmd = &pSql->cmd;
SUdfFuncMsg* pFuncMsg = (SUdfFuncMsg *)pSql->res.pRsp;
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
pFuncMsg->num = htonl(pFuncMsg->num);
assert(pFuncMsg->num == taosArrayGetSize(pCmd->pUdfInfo));
assert(pFuncMsg->num == taosArrayGetSize(pQueryInfo->pUdfInfo));
char* pMsg = pFuncMsg->content;
for(int32_t i = 0; i < pFuncMsg->num; ++i) {
SFunctionInfoMsg* pFunc = (SFunctionInfoMsg*) pMsg;
for(int32_t j = 0; j < pFuncMsg->num; ++j) {
SUdfInfo* pUdfInfo = taosArrayGet(pCmd->pUdfInfo, j);
SUdfInfo* pUdfInfo = taosArrayGet(pQueryInfo->pUdfInfo, j);
if (strcmp(pUdfInfo->name, pFunc->name) != 0) {
continue;
}
......@@ -2161,11 +2164,13 @@ int tscProcessRetrieveFuncRsp(SSqlObj* pSql) {
return pSql->res.code;
}
SQueryInfo* parQueryInfo = tscGetQueryInfo(&parent->cmd, parent->cmd.clauseIndex);
assert(parent->signature == parent && (int64_t)pSql->param == parent->self);
taosArrayDestroy(parent->cmd.pUdfInfo);
taosArrayDestroy(parQueryInfo->pUdfInfo);
parent->cmd.pUdfInfo = pCmd->pUdfInfo; // assigned to parent sql obj.
pCmd->pUdfInfo = NULL;
parQueryInfo->pUdfInfo = pQueryInfo->pUdfInfo; // assigned to parent sql obj.
pQueryInfo->pUdfInfo = NULL;
return TSDB_CODE_SUCCESS;
}
......@@ -2479,7 +2484,7 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
tscSetResRawPtr(pRes, pQueryInfo);
} else if ((UTIL_TABLE_IS_CHILD_TABLE(pTableMetaInfo) || UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) && !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_SUBQUERY)) {
tscSetResRawPtr(pRes, pQueryInfo);
} else if (tscNonOrderedProjectionQueryOnSTable(pCmd, pQueryInfo, 0) && !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_QUERY) && !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
} else if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_QUERY) && !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
tscSetResRawPtr(pRes, pQueryInfo);
}
......@@ -2607,7 +2612,7 @@ int tscGetTableMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool create
return tscGetTableMeta(pSql, pTableMetaInfo);
}
int32_t tscGetUdfFromNode(SSqlObj *pSql) {
int32_t tscGetUdfFromNode(SSqlObj *pSql, SQueryInfo* pQueryInfo) {
SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
if (NULL == pNew) {
tscError("%p malloc failed for new sqlobj to get user-defined functions", pSql);
......@@ -2618,14 +2623,24 @@ int32_t tscGetUdfFromNode(SSqlObj *pSql) {
pNew->signature = pNew;
pNew->cmd.command = TSDB_SQL_RETRIEVE_FUNC;
pNew->cmd.pUdfInfo = taosArrayInit(4, sizeof(SUdfInfo));
for(int32_t i = 0; i < taosArrayGetSize(pSql->cmd.pUdfInfo); ++i) {
if (tscAddQueryInfo(&pNew->cmd) != TSDB_CODE_SUCCESS) {
tscError("%p malloc failed for new queryinfo", pSql);
tscFreeSqlObj(pNew);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
SQueryInfo *pNewQueryInfo = tscGetQueryInfo(&pNew->cmd, 0);
pNewQueryInfo->pUdfInfo = taosArrayInit(4, sizeof(SUdfInfo));
for(int32_t i = 0; i < taosArrayGetSize(pQueryInfo->pUdfInfo); ++i) {
SUdfInfo info = {0};
SUdfInfo* p1 = taosArrayGet(pSql->cmd.pUdfInfo, i);
SUdfInfo* p1 = taosArrayGet(pQueryInfo->pUdfInfo, i);
info = *p1;
info.name = strdup(p1->name);
taosArrayPush(pNew->cmd.pUdfInfo, &info);
taosArrayPush(pNewQueryInfo->pUdfInfo, &info);
}
pNew->cmd.active = pNewQueryInfo;
if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE + pSql->cmd.payloadLen)) {
tscError("%p malloc failed for payload to get table meta", pSql);
......
......@@ -615,7 +615,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
if ((pExpr->base.colInfo.colId != PRIMARYKEY_TIMESTAMP_COL_INDEX) ||
(funcId != TSDB_FUNC_TS && funcId != TSDB_FUNC_TS_DUMMY && funcId != TSDB_FUNC_PRJ)) {
int16_t functionId = tscIsProjectionQuery(&pNew->cmd, pQueryInfo)? TSDB_FUNC_PRJ : TSDB_FUNC_TS;
int16_t functionId = tscIsProjectionQuery(pQueryInfo)? TSDB_FUNC_PRJ : TSDB_FUNC_TS;
tscAddFuncInSelectClause(pQueryInfo, 0, functionId, &index, s, TSDB_COL_NORMAL);
tscPrintSelNodeList(pNew, 0);
......@@ -636,7 +636,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
assert(pTableMetaInfo->pVgroupTables != NULL);
if (tscNonOrderedProjectionQueryOnSTable(&pNew->cmd, pQueryInfo, 0)) {
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
SArray* p = buildVgroupTableByResult(pQueryInfo, pTableMetaInfo->pVgroupTables);
tscFreeVgroupTableInfo(pTableMetaInfo->pVgroupTables);
pTableMetaInfo->pVgroupTables = p;
......@@ -1429,7 +1429,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
}
SSubqueryState* pState = &pParentSql->subState;
if (tscNonOrderedProjectionQueryOnSTable(pCmd, pQueryInfo, 0) && numOfRows == 0) {
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && numOfRows == 0) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
assert(pQueryInfo->numOfTables == 1);
......@@ -1561,7 +1561,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
}
SQueryInfo* p = tscGetQueryInfo(&pSub->cmd, 0);
orderedPrjQuery = tscNonOrderedProjectionQueryOnSTable(&pSub->cmd, p, 0);
orderedPrjQuery = tscNonOrderedProjectionQueryOnSTable(p, 0);
if (orderedPrjQuery) {
break;
}
......@@ -1586,7 +1586,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSub->cmd, 0);
if (tscNonOrderedProjectionQueryOnSTable(&pSub->cmd, pQueryInfo, 0) && pSub->res.row >= pSub->res.numOfRows &&
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && pSub->res.row >= pSub->res.numOfRows &&
pSub->res.completed) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
assert(pQueryInfo->numOfTables == 1);
......@@ -1788,7 +1788,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
}
// In case of consequence query from other vnode, do not wait for other query response here.
if (!(pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(&pSql->cmd, pQueryInfo, 0))) {
if (!(pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0))) {
if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
return;
}
......@@ -1800,7 +1800,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
* if the query is a continue query (vgroupIndex > 0 for projection query) for next vnode, do the retrieval of
* data instead of returning to its invoker
*/
if (pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(&pSql->cmd, pQueryInfo, 0)) {
if (pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
pSql->fp = joinRetrieveFinalResCallback; // continue retrieve data
pSql->cmd.command = TSDB_SQL_FETCH;
......@@ -2773,6 +2773,8 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
// set the command flag must be after the semaphore been correctly set.
if (pParentSql->cmd.command != TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
pParentSql->cmd.command = TSDB_SQL_RETRIEVE_LOCALMERGE;
SQueryInfo *pQueryInfo2 = tscGetQueryInfo(&pParentSql->cmd, pParentSql->cmd.clauseIndex);
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t j = 0; j < size; ++j) {
......@@ -2781,7 +2783,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
int32_t functionId = pCtx->functionId;
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pParentSql->cmd.pUdfInfo, -1 * functionId - 1);
SUdfInfo* pUdfInfo = taosArrayGet(pQueryInfo2->pUdfInfo, -1 * functionId - 1);
code = initUdfInfo(pUdfInfo);
if (code != TSDB_CODE_SUCCESS) {
pParentSql->res.code = code;
......@@ -2862,8 +2864,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
tscDebug("0x%"PRIx64" sub:%p retrieve numOfRows:%d totalNumOfRows:%" PRIu64 " from ep:%s, orderOfSub:%d",
pParentSql->self, pSql, pRes->numOfRows, pState->numOfRetrievedRows, pSql->epSet.fqdn[pSql->epSet.inUse], idx);
SSqlCmd* pCmd = &pSql->cmd;
if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pCmd, pQueryInfo, 0)) {
if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pQueryInfo, 0)) {
tscError("%p sub:%p num of OrderedRes is too many, max allowed:%" PRId32 " , current:%" PRId64,
pParentSql, pSql, tsMaxNumOfOrderedResults, num);
tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_SORTED_RES_TOO_MANY);
......
......@@ -127,7 +127,7 @@ bool tscIsTwoStageSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t tab
}
// for ordered projection query, iterate all qualified vnodes sequentially
if (tscNonOrderedProjectionQueryOnSTable(pCmd, pQueryInfo, tableIndex)) {
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, tableIndex)) {
return false;
}
......@@ -138,7 +138,7 @@ bool tscIsTwoStageSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t tab
return false;
}
bool tscIsProjectionQueryOnSTable(SSqlCmd *pCmd, SQueryInfo* pQueryInfo, int32_t tableIndex) {
bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
/*
......@@ -156,7 +156,7 @@ bool tscIsProjectionQueryOnSTable(SSqlCmd *pCmd, SQueryInfo* pQueryInfo, int32_t
int32_t functionId = tscSqlExprGet(pQueryInfo, i)->base.functionId;
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pCmd->pUdfInfo, -1 * functionId - 1);
SUdfInfo* pUdfInfo = taosArrayGet(pQueryInfo->pUdfInfo, -1 * functionId - 1);
if (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE) {
return false;
}
......@@ -179,8 +179,8 @@ bool tscIsProjectionQueryOnSTable(SSqlCmd *pCmd, SQueryInfo* pQueryInfo, int32_t
}
// not order by timestamp projection query on super table
bool tscNonOrderedProjectionQueryOnSTable(SSqlCmd *pCmd, SQueryInfo* pQueryInfo, int32_t tableIndex) {
if (!tscIsProjectionQueryOnSTable(pCmd, pQueryInfo, tableIndex)) {
bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) {
if (!tscIsProjectionQueryOnSTable(pQueryInfo, tableIndex)) {
return false;
}
......@@ -188,8 +188,8 @@ bool tscNonOrderedProjectionQueryOnSTable(SSqlCmd *pCmd, SQueryInfo* pQueryInfo,
return pQueryInfo->order.orderColId < 0;
}
bool tscOrderedProjectionQueryOnSTable(SSqlCmd *pCmd, SQueryInfo* pQueryInfo, int32_t tableIndex) {
if (!tscIsProjectionQueryOnSTable(pCmd, pQueryInfo, tableIndex)) {
bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) {
if (!tscIsProjectionQueryOnSTable(pQueryInfo, tableIndex)) {
return false;
}
......@@ -197,14 +197,14 @@ bool tscOrderedProjectionQueryOnSTable(SSqlCmd *pCmd, SQueryInfo* pQueryInfo, in
return pQueryInfo->order.orderColId >= 0;
}
bool tscIsProjectionQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
bool tscIsProjectionQuery(SQueryInfo* pQueryInfo) {
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < size; ++i) {
int32_t functionId = tscSqlExprGet(pQueryInfo, i)->base.functionId;
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pCmd->pUdfInfo, -1 * functionId - 1);
SUdfInfo* pUdfInfo = taosArrayGet(pQueryInfo->pUdfInfo, -1 * functionId - 1);
if (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE) {
return false;
}
......@@ -249,7 +249,7 @@ bool tscIsSecondStageQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
}
}
if (tscIsProjectionQuery(pCmd, pQueryInfo)) {
if (tscIsProjectionQuery(pQueryInfo)) {
return false;
}
......@@ -422,6 +422,15 @@ bool isSimpleAggregate(SQueryInfo* pQueryInfo) {
}
int32_t functionId = pExpr->base.functionId;
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pQueryInfo->pUdfInfo, -1 * functionId - 1);
if (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE) {
return true;
}
continue;
}
if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY) {
continue;
}
......@@ -799,6 +808,12 @@ void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeMeta) {
tfree(pUp);
}
if (pCmd->subCmd) {
pQueryInfo->pUdfInfo = taosArrayDestroy(pQueryInfo->pUdfInfo);
} else {
pQueryInfo->pUdfInfo = tscDestroyUdfArrayList(pQueryInfo->pUdfInfo);
}
freeQueryInfoImpl(pQueryInfo);
clearAllTableMetaInfo(pQueryInfo, removeMeta);
......@@ -842,11 +857,6 @@ void tscResetSqlCmd(SSqlCmd* pCmd, bool removeMeta) {
pCmd->pTableBlockHashList = tscDestroyBlockHashTable(pCmd->pTableBlockHashList, removeMeta);
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
if (pCmd->subCmd) {
pCmd->pUdfInfo = taosArrayDestroy(pCmd->pUdfInfo);
} else {
pCmd->pUdfInfo = tscDestroyUdfArrayList(pCmd->pUdfInfo);
}
tscFreeQueryInfo(pCmd, removeMeta);
}
......@@ -2683,12 +2693,13 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
goto _error;
}
if (pCmd->pUdfInfo) {
pnCmd->pUdfInfo = taosArrayDup(pCmd->pUdfInfo);
}
SQueryInfo* pNewQueryInfo = tscGetQueryInfo(pnCmd, 0);
if (pQueryInfo->pUdfInfo) {
pNewQueryInfo->pUdfInfo = taosArrayDup(pQueryInfo->pUdfInfo);
}
pNewQueryInfo->command = pQueryInfo->command;
pnCmd->active = pNewQueryInfo;
......@@ -3096,7 +3107,7 @@ bool hasMoreVnodesToTry(SSqlObj* pSql) {
numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
}
return tscNonOrderedProjectionQueryOnSTable(pCmd, pQueryInfo, 0) &&
return tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) &&
(!tscHasReachLimitation(pQueryInfo, pRes)) && (pTableMetaInfo->vgroupIndex < numOfVgroups - 1);
}
......@@ -3114,7 +3125,7 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
* no result returned from the current virtual node anymore, try the next vnode if exists
* if case of: multi-vnode super table projection query
*/
assert(pRes->numOfRows == 0 && tscNonOrderedProjectionQueryOnSTable(pCmd, pQueryInfo, 0) && !tscHasReachLimitation(pQueryInfo, pRes));
assert(pRes->numOfRows == 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && !tscHasReachLimitation(pQueryInfo, pRes));
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
int32_t totalVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
......@@ -3489,9 +3500,15 @@ static int32_t createSecondaryExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQueryInf
functionId = TSDB_FUNC_STDDEV;
}
SUdfInfo* pUdfInfo = NULL;
if (functionId < 0) {
pUdfInfo = taosArrayGet(pQueryInfo->pUdfInfo, -1 * functionId - 1);
}
int32_t inter = 0;
getResultDataInfo(pExpr->base.colType, pExpr->base.colBytes, functionId, 0, &pse->resType,
&pse->resBytes, &inter, 0, false, NULL);
&pse->resBytes, &inter, 0, false, pUdfInfo);
pse->colType = pse->resType;
pse->colBytes = pse->resBytes;
......@@ -3558,8 +3575,14 @@ static int32_t createGlobalAggregateExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQu
functionId = TSDB_FUNC_STDDEV;
}
SUdfInfo* pUdfInfo = NULL;
if (functionId < 0) {
pUdfInfo = taosArrayGet(pQueryInfo->pUdfInfo, -1 * functionId - 1);
}
getResultDataInfo(pExpr->base.colType, pExpr->base.colBytes, functionId, 0, &pse->resType, &pse->resBytes, &inter,
0, false, NULL);
0, false, pUdfInfo);
}
}
......@@ -3633,7 +3656,8 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
pQueryAttr->fillType = pQueryInfo->fillType;
pQueryAttr->groupbyColumn = tscGroupbyColumn(pQueryInfo);
pQueryAttr->havingNum = pQueryInfo->havingFieldNum;
pQueryAttr->pUdfInfo = pQueryInfo->pUdfInfo;
if (pQueryInfo->order.order == TSDB_ORDER_ASC) { // TODO refactor
pQueryAttr->window = pQueryInfo->window;
} else {
......
......@@ -236,6 +236,7 @@ typedef struct SQueryAttr {
SMemRef memRef;
STableGroupInfo tableGroupInfo; // table <tid, last_key> list SArray<STableKeyInfo>
int32_t vgId;
SArray *pUdfInfo; // no need to free
} SQueryAttr;
typedef SSDataBlock* (*__operator_fn_t)(void* param, bool* newgroup);
......@@ -495,6 +496,7 @@ typedef struct SMultiwayMergeInfo {
bool hasPrev;
bool groupMix;
SArray *udfInfo;
} SMultiwayMergeInfo;
SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime);
......@@ -515,7 +517,7 @@ SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperat
SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createMultiwaySortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput,
int32_t numOfRows, void* merger, bool groupMix);
SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param);
SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo);
SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger);
SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
......@@ -530,6 +532,7 @@ void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlo
int32_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput);
void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset);
void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOfInputRows);
void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity);
void freeParam(SQueryParam *param);
int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param);
......@@ -571,4 +574,6 @@ void freeQueryAttr(SQueryAttr *pQuery);
int32_t getMaximumIdleDurationSec();
void doInvokeUdf(SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t type);
#endif // TDENGINE_QEXECUTOR_H
......@@ -701,39 +701,72 @@ static int32_t getNumOfRowsInTimeWindow(SQueryRuntimeEnv* pRuntimeEnv, SDataBloc
return num;
}
static void doInvokeUdf(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t idx) {
void doInvokeUdf(SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t type) {
int32_t output = 0;
SUdfInfo* pUdfInfo = pRuntimeEnv->pUdfInfo;
if (pUdfInfo && pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL]) {
qDebug("invoke udf function:%s,%p", pUdfInfo->name, pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL]);
if (pUdfInfo->isScript) {
(*(scriptNormalFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL])(pUdfInfo->pScriptCtx,
(char *)pCtx->pInput + idx * pCtx->inputType, pCtx->inputType, pCtx->inputBytes, pCtx->size, pCtx->ptsList, pCtx->pOutput,
(char *)pCtx->ptsOutputBuf, &output, pCtx->outputType, pCtx->outputBytes);
} else {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
if (pUdfInfo == NULL || pUdfInfo->funcs[type] == NULL) {
qError("empty udf function, type:%d", type);
return;
}
void *interBuf = (void *)GET_ROWCELL_INTERBUF(pResInfo);
qDebug("invoke udf function:%s,%p", pUdfInfo->name, pUdfInfo->funcs[type]);
(*(udfNormalFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL])((char *)pCtx->pInput + idx * pCtx->inputType, pCtx->inputType, pCtx->inputBytes, pCtx->size, pCtx->ptsList,
pCtx->pOutput, interBuf, (char *)pCtx->ptsOutputBuf, &output, pCtx->outputType, pCtx->outputBytes, &pUdfInfo->init);
}
switch (type) {
case TSDB_UDF_FUNC_NORMAL:
if (pUdfInfo->isScript) {
(*(scriptNormalFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL])(pUdfInfo->pScriptCtx,
(char *)pCtx->pInput + idx * pCtx->inputType, pCtx->inputType, pCtx->inputBytes, pCtx->size, pCtx->ptsList, pCtx->pOutput,
(char *)pCtx->ptsOutputBuf, &output, pCtx->outputType, pCtx->outputBytes);
} else {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
if (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE) {
void *interBuf = (void *)GET_ROWCELL_INTERBUF(pResInfo);
(*(udfNormalFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL])((char *)pCtx->pInput + idx * pCtx->inputType, pCtx->inputType, pCtx->inputBytes, pCtx->size, pCtx->ptsList,
pCtx->pOutput, interBuf, (char *)pCtx->ptsOutputBuf, &output, pCtx->outputType, pCtx->outputBytes, &pUdfInfo->init);
}
if (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE) {
pCtx->resultInfo->numOfRes = output;
} else {
pCtx->resultInfo->numOfRes += output;
}
if (pCtx->resultInfo->numOfRes > 0) {
pCtx->resultInfo->hasResult = DATA_SET_FLAG;
}
break;
case TSDB_UDF_FUNC_MERGE:
(*(udfMergeFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_MERGE])(pCtx->pInput, pCtx->size, pCtx->pOutput, &output, &pUdfInfo->init);
// set the output value exist
pCtx->resultInfo->numOfRes = output;
} else {
pCtx->resultInfo->numOfRes += output;
}
if (output > 0) {
pCtx->resultInfo->hasResult = DATA_SET_FLAG;
}
if (pCtx->resultInfo->numOfRes > 0) {
pCtx->resultInfo->hasResult = DATA_SET_FLAG;
}
break;
return;
case TSDB_UDF_FUNC_FINALIZE: {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
void *interBuf = (void *)GET_ROWCELL_INTERBUF(pResInfo);
if (pUdfInfo->isScript) {
(*(scriptFinalizeFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE])(pUdfInfo->pScriptCtx, pCtx->pOutput, &output);
} else {
(*(udfFinalizeFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE])(pCtx->pOutput, interBuf, &output, &pUdfInfo->init);
}
// set the output value exist
pCtx->resultInfo->numOfRes = output;
if (output > 0) {
pCtx->resultInfo->hasResult = DATA_SET_FLAG;
}
break;
}
}
qError("empty udf function");
return;
}
......@@ -768,7 +801,8 @@ static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
// aAggs[functionId].xFunction(&pCtx[k]);
if (functionId < 0) { // load the script and exec, pRuntimeEnv->pUdfInfo
doInvokeUdf(pRuntimeEnv, &pCtx[k], 0);
SUdfInfo* pUdfInfo = pRuntimeEnv->pUdfInfo;
doInvokeUdf(pUdfInfo, &pCtx[k], 0, TSDB_UDF_FUNC_NORMAL);
} else {
aAggs[functionId].xFunction(&pCtx[k]);
}
......@@ -982,6 +1016,13 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx,
pCtx[i].pInput = p->pData;
assert(p->info.colId == pColIndex->colId && pCtx[i].inputType == p->info.type);
if (pCtx[i].functionId < 0) {
SColumnInfoData* tsInfo = taosArrayGet(pBlock->pDataBlock, 0);
pCtx[i].ptsList = (int64_t*) tsInfo->pData;
continue;
}
uint32_t status = aAggs[pCtx[i].functionId].status;
if ((status & (TSDB_FUNCSTATE_SELECTIVITY | TSDB_FUNCSTATE_NEED_TS)) != 0) {
SColumnInfoData* tsInfo = taosArrayGet(pBlock->pDataBlock, 0);
......@@ -1011,7 +1052,8 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SQLFunction
pCtx[k].startTs = startTs;// this can be set during create the struct
// aAggs[functionId].xFunction(&pCtx[k]);
if (functionId < 0) {
doInvokeUdf(pRuntimeEnv, &pCtx[k], 0);
SUdfInfo* pUdfInfo = pRuntimeEnv->pUdfInfo;
doInvokeUdf(pUdfInfo, &pCtx[k], 0, TSDB_UDF_FUNC_NORMAL);
} else {
aAggs[functionId].xFunction(&pCtx[k]);
}
......@@ -1033,8 +1075,9 @@ static void arithmeticApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionC
pCtx[k].startTs = pQueryAttr->window.skey;
if (pCtx[k].functionId < 0) {
// load the script and exec
doInvokeUdf(pRuntimeEnv, &pCtx[k], 0);
// load the script and exec
SUdfInfo* pUdfInfo = pRuntimeEnv->pUdfInfo;
doInvokeUdf(pUdfInfo, &pCtx[k], 0, TSDB_UDF_FUNC_NORMAL);
} else {
aAggs[pCtx[k].functionId].xFunction(&pCtx[k]);
}
......@@ -1359,7 +1402,8 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn
pInfo->binfo.pCtx[k].size = 1;
int32_t functionId = pInfo->binfo.pCtx[k].functionId;
if (functionId < 0) {
doInvokeUdf(pRuntimeEnv, &pInfo->binfo.pCtx[k], j);
SUdfInfo* pUdfInfo = pRuntimeEnv->pUdfInfo;
doInvokeUdf(pUdfInfo, &pInfo->binfo.pCtx[k], j, TSDB_UDF_FUNC_NORMAL);
} else if (functionNeedToExecute(pRuntimeEnv, &pInfo->binfo.pCtx[k], functionId)) {
aAggs[functionId].xFunctionF(&pInfo->binfo.pCtx[k], j);
}
......@@ -1848,7 +1892,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
case OP_GlobalAggregate: {
pRuntimeEnv->proot = createGlobalAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3,
pQueryAttr->numOfExpr3, merger);
pQueryAttr->numOfExpr3, merger, pQueryAttr->pUdfInfo);
break;
}
......@@ -3154,6 +3198,21 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf
}
}
void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity) {
SSDataBlock* pDataBlock = pBInfo->pRes;
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
SColumnInfoData *pColInfo = taosArrayGet(pDataBlock->pDataBlock, i);
int32_t functionId = pBInfo->pCtx[i].functionId;
if (functionId < 0) {
memset(pBInfo->pCtx[i].pOutput, 0, pColInfo->info.bytes * (*bufCapacity));
}
}
}
void initCtxOutputBuffer(SQLFunctionCtx* pCtx, int32_t size) {
for (int32_t j = 0; j < size; ++j) {
SResultRowCellInfo* pResInfo = GET_RES_INFO(&pCtx[j]);
......@@ -3221,23 +3280,7 @@ void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResult
for (int32_t j = 0; j < numOfOutput; ++j) {
if (pCtx[j].functionId < 0) {
int32_t output = 0;
SResultRowCellInfo *pResInfo = GET_RES_INFO(&pCtx[j]);
void *interBuf = (void *)GET_ROWCELL_INTERBUF(pResInfo);
if (pRuntimeEnv->pUdfInfo && pRuntimeEnv->pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE]) {
if (pRuntimeEnv->pUdfInfo->isScript) {
(*(scriptFinalizeFunc)pRuntimeEnv->pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE])(pRuntimeEnv->pUdfInfo->pScriptCtx, pCtx[j].pOutput, &output);
} else {
(*(udfFinalizeFunc)pRuntimeEnv->pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE])(pCtx[j].pOutput, interBuf, &output, &pRuntimeEnv->pUdfInfo->init);
}
}
// set the output value exist
pCtx[j].resultInfo->numOfRes = output;
if (output > 0) {
pCtx[j].resultInfo->hasResult = DATA_SET_FLAG;
}
doInvokeUdf(pRuntimeEnv->pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE);
} else {
aAggs[pCtx[j].functionId].xFinalize(&pCtx[j]);
}
......@@ -3254,23 +3297,7 @@ void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResult
} else {
for (int32_t j = 0; j < numOfOutput; ++j) {
if (pCtx[j].functionId < 0) {
int32_t output = 0;
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
void *interBuf = (void *)GET_ROWCELL_INTERBUF(pResInfo);
if (pRuntimeEnv->pUdfInfo && pRuntimeEnv->pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE]) {
if (pRuntimeEnv->pUdfInfo->isScript) {
(*(scriptFinalizeFunc)pRuntimeEnv->pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE])(pRuntimeEnv->pUdfInfo->pScriptCtx, pCtx[j].pOutput, &output);
} else {
(*(udfFinalizeFunc)pRuntimeEnv->pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE])(pCtx[j].pOutput, interBuf, &output, &pRuntimeEnv->pUdfInfo->init);
}
}
// set the output value exist
pCtx[j].resultInfo->numOfRes = output;
if (output > 0) {
pCtx[j].resultInfo->hasResult = DATA_SET_FLAG;
}
doInvokeUdf(pRuntimeEnv->pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE);
} else {
aAggs[pCtx[j].functionId].xFinalize(&pCtx[j]);
}
......@@ -4639,7 +4666,7 @@ static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) {
}
SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream,
SExprInfo* pExpr, int32_t numOfOutput, void* param) {
SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo) {
SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo));
pInfo->resultRowFactor =
......@@ -4650,6 +4677,7 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
pInfo->pMerge = param;
pInfo->bufCapacity = 4096;
pInfo->udfInfo = pUdfInfo;
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity * pInfo->resultRowFactor);
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
......@@ -4916,7 +4944,7 @@ static SSDataBlock* doArithmeticOperation(void* param, bool* newgroup) {
}
// Return result of the previous group in the firstly.
if (newgroup && pRes->info.rows > 0) {
if (*newgroup && pRes->info.rows > 0) {
pArithInfo->existDataBlock = pBlock;
clearNumOfRes(pInfo->pCtx, pOperator->numOfOutput);
return pInfo->pRes;
......
......@@ -138,6 +138,12 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
} else { // diff/add/multiply/subtract/division
op = OP_Arithmetic;
taosArrayPush(plan, &op);
//arithmetic on scalar function
if (pQueryAttr->pExpr2 != NULL) {
op = OP_Arithmetic;
taosArrayPush(plan, &op);
}
}
if (pQueryAttr->limit.limit > 0 || pQueryAttr->limit.offset > 0) {
......
......@@ -678,7 +678,6 @@ if $data61 != 8 then
return -1
endi
sql select add_one(f1)+1 from stb1;
if $rows != 17 then
return -1
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册