提交 f96fbcb6 编写于 作者: D dapan1121

support udf in nested query

上级 e9ec776c
...@@ -8564,6 +8564,11 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -8564,6 +8564,11 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) {
info.functionId = (int32_t)taosArrayGetSize(pQueryInfo->pUdfInfo) * (-1) - 1;; info.functionId = (int32_t)taosArrayGetSize(pQueryInfo->pUdfInfo) * (-1) - 1;;
taosArrayPush(pQueryInfo->pUdfInfo, &info); taosArrayPush(pQueryInfo->pUdfInfo, &info);
if ((int32_t)taosArrayGetSize(pQueryInfo->pUdfInfo) > 1) {
code = tscInvalidOperationMsg(tscGetErrorMsgPayload(pCmd), "only one udf allowed in one sql", NULL);
goto _end;
}
} }
} }
} }
......
...@@ -1271,6 +1271,25 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue ...@@ -1271,6 +1271,25 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue
.pGroupList = taosArrayInit(1, POINTER_BYTES), .pGroupList = taosArrayInit(1, POINTER_BYTES),
}; };
SUdfInfo* pUdfInfo = NULL;
size_t size = tscNumOfExprs(px);
for (int32_t j = 0; j < size; ++j) {
SExprInfo* pExprInfo = tscExprGet(px, j);
int32_t functionId = pExprInfo->base.functionId;
if (functionId < 0) {
pUdfInfo = taosArrayGet(px->pUdfInfo, -1 * functionId - 1);
int32_t code = initUdfInfo(pUdfInfo);
if (code != TSDB_CODE_SUCCESS) {
pSql->res.code = code;
return;
}
break;
}
}
tableGroupInfo.map = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); tableGroupInfo.map = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
STableKeyInfo tableKeyInfo = {.pTable = NULL, .lastKey = INT64_MIN}; STableKeyInfo tableKeyInfo = {.pTable = NULL, .lastKey = INT64_MIN};
...@@ -1352,6 +1371,9 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue ...@@ -1352,6 +1371,9 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue
tscDebug("0x%"PRIx64" create QInfo 0x%"PRIx64" to execute the main query while all nest queries are ready", pSql->self, pSql->self); tscDebug("0x%"PRIx64" create QInfo 0x%"PRIx64" to execute the main query while all nest queries are ready", pSql->self, pSql->self);
px->pQInfo = createQInfoFromQueryNode(px, &tableGroupInfo, pSourceOperator, NULL, NULL, MASTER_SCAN, pSql->self); px->pQInfo = createQInfoFromQueryNode(px, &tableGroupInfo, pSourceOperator, NULL, NULL, MASTER_SCAN, pSql->self);
px->pQInfo->runtimeEnv.udfIsCopy = true;
px->pQInfo->runtimeEnv.pUdfInfo = pUdfInfo;
tfree(pColumnInfo); tfree(pColumnInfo);
tfree(schema); tfree(schema);
......
...@@ -313,6 +313,7 @@ typedef struct SQueryRuntimeEnv { ...@@ -313,6 +313,7 @@ typedef struct SQueryRuntimeEnv {
SRspResultInfo resultInfo; SRspResultInfo resultInfo;
SHashObj *pTableRetrieveTsMap; SHashObj *pTableRetrieveTsMap;
SUdfInfo *pUdfInfo; SUdfInfo *pUdfInfo;
bool udfIsCopy;
} SQueryRuntimeEnv; } SQueryRuntimeEnv;
enum { enum {
......
...@@ -2408,7 +2408,9 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -2408,7 +2408,9 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
tfree(pRuntimeEnv->sasArray); tfree(pRuntimeEnv->sasArray);
} }
if (!pRuntimeEnv->udfIsCopy) {
destroyUdfInfo(pRuntimeEnv->pUdfInfo); destroyUdfInfo(pRuntimeEnv->pUdfInfo);
}
destroyResultBuf(pRuntimeEnv->pResultBuf); destroyResultBuf(pRuntimeEnv->pResultBuf);
doFreeQueryHandle(pRuntimeEnv); doFreeQueryHandle(pRuntimeEnv);
...@@ -8038,7 +8040,7 @@ static char* getUdfFuncName(char* funcname, char* name, int type) { ...@@ -8038,7 +8040,7 @@ static char* getUdfFuncName(char* funcname, char* name, int type) {
} }
int32_t initUdfInfo(SUdfInfo* pUdfInfo) { int32_t initUdfInfo(SUdfInfo* pUdfInfo) {
if (pUdfInfo == NULL) { if (pUdfInfo == NULL || pUdfInfo->handle) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
//qError("script len: %d", pUdfInfo->contLen); //qError("script len: %d", pUdfInfo->contLen);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册