Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
a1ba80b4
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
a1ba80b4
编写于
5月 13, 2022
作者:
S
shenglian-zhou
提交者:
GitHub
5月 13, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #12451 from taosdata/feature/udf
feat: enhance udf scalar function with function handle map
上级
095217a2
d97030af
变更
2
显示空白变更内容
内联
并排
Showing
2 changed file
with
44 addition
and
14 deletion
+44
-14
source/libs/scalar/inc/sclInt.h
source/libs/scalar/inc/sclInt.h
+2
-0
source/libs/scalar/src/scalar.c
source/libs/scalar/src/scalar.c
+42
-14
未找到文件。
source/libs/scalar/inc/sclInt.h
浏览文件 @
a1ba80b4
...
...
@@ -27,11 +27,13 @@ typedef struct SScalarCtx {
SArray
*
pBlockList
;
/* element is SSDataBlock* */
SHashObj
*
pRes
;
/* element is SScalarParam */
void
*
param
;
// additional parameter (meta actually) for acquire value such as tbname/tags values
SHashObj
*
udf2Handle
;
}
SScalarCtx
;
#define SCL_DATA_TYPE_DUMMY_HASH 9000
#define SCL_DEFAULT_OP_NUM 10
#define SCL_DEFAULT_UDF_NUM 8
#define SCL_IS_CONST_NODE(_node) ((NULL == (_node)) || (QUERY_NODE_VALUE == (_node)->type) || (QUERY_NODE_NODE_LIST == (_node)->type))
#define SCL_IS_CONST_CALC(_ctx) (NULL == (_ctx)->pBlockList)
...
...
source/libs/scalar/src/scalar.c
浏览文件 @
a1ba80b4
...
...
@@ -153,6 +153,18 @@ void sclFreeRes(SHashObj *res) {
taosHashCleanup
(
res
);
}
void
sclFreeUdfHandles
(
SHashObj
*
udf2handle
)
{
void
*
pIter
=
taosHashIterate
(
udf2handle
,
NULL
);
while
(
pIter
)
{
UdfcFuncHandle
*
handle
=
(
UdfcFuncHandle
*
)
pIter
;
if
(
handle
)
{
teardownUdf
(
*
handle
);
}
pIter
=
taosHashIterate
(
udf2handle
,
pIter
);
}
taosHashCleanup
(
udf2handle
);
}
void
sclFreeParam
(
SScalarParam
*
param
)
{
if
(
param
->
columnData
!=
NULL
)
{
colDataDestroy
(
param
->
columnData
);
...
...
@@ -362,18 +374,24 @@ int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *outp
if
(
fmIsUserDefinedFunc
(
node
->
funcId
))
{
UdfcFuncHandle
udfHandle
=
NULL
;
code
=
setupUdf
(
node
->
functionName
,
&
udfHandle
);
if
(
code
!=
0
)
{
sclError
(
"fmExecFunction error. setupUdf. function name: %s, code:%d"
,
node
->
functionName
,
code
);
goto
_return
;
char
*
udfName
=
node
->
functionName
;
if
(
ctx
->
udf2Handle
)
{
UdfcFuncHandle
*
pHandle
=
taosHashGet
(
ctx
->
udf2Handle
,
udfName
,
strlen
(
udfName
));
if
(
pHandle
)
{
udfHandle
=
*
pHandle
;
}
code
=
callUdfScalarFunc
(
udfHandle
,
params
,
paramNum
,
output
);
}
if
(
udfHandle
==
NULL
)
{
code
=
setupUdf
(
udfName
,
&
udfHandle
);
if
(
code
!=
0
)
{
sclError
(
"fmExecFunction error. callUdfScalarFunc. function name: %s, udf code:%d"
,
node
->
function
Name
,
code
);
sclError
(
"fmExecFunction error. setupUdf. function name: %s, code:%d"
,
udf
Name
,
code
);
goto
_return
;
}
code
=
teardownUdf
(
udfHandle
);
if
(
ctx
->
udf2Handle
)
{
taosHashPut
(
ctx
->
udf2Handle
,
udfName
,
strlen
(
udfName
),
&
udfHandle
,
sizeof
(
UdfcFuncHandle
));
}
}
code
=
callUdfScalarFunc
(
udfHandle
,
params
,
paramNum
,
output
);
if
(
code
!=
0
)
{
sclError
(
"fmExecFunction error. callUdfScalarFunc. function name: %s, udf code:%d"
,
node
->
functionName
,
code
);
goto
_return
;
...
...
@@ -891,15 +909,20 @@ int32_t scalarCalculateConstants(SNode *pNode, SNode **pRes) {
SScalarCtx
ctx
=
{
0
};
ctx
.
pRes
=
taosHashInit
(
SCL_DEFAULT_OP_NUM
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
false
,
HASH_NO_LOCK
);
if
(
NULL
==
ctx
.
pRes
)
{
sclError
(
"taosHashInit failed, num:%d"
,
SCL_DEFAULT_OP_NUM
);
sclError
(
"taosHashInit result map failed, num:%d"
,
SCL_DEFAULT_OP_NUM
);
SCL_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
ctx
.
udf2Handle
=
taosHashInit
(
SCL_DEFAULT_UDF_NUM
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_NO_LOCK
);
if
(
NULL
==
ctx
.
udf2Handle
)
{
sclError
(
"taosHashInit udf to handle map failed, num:%d"
,
SCL_DEFAULT_OP_NUM
);
SCL_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
nodesRewriteExprPostOrder
(
&
pNode
,
sclConstantsRewriter
,
(
void
*
)
&
ctx
);
SCL_ERR_JRET
(
ctx
.
code
);
*
pRes
=
pNode
;
_return:
sclFreeUdfHandles
(
ctx
.
udf2Handle
);
sclFreeRes
(
ctx
.
pRes
);
return
code
;
}
...
...
@@ -915,10 +938,14 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) {
// TODO: OPT performance
ctx
.
pRes
=
taosHashInit
(
SCL_DEFAULT_OP_NUM
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
false
,
HASH_NO_LOCK
);
if
(
NULL
==
ctx
.
pRes
)
{
sclError
(
"taosHashInit failed, num:%d"
,
SCL_DEFAULT_OP_NUM
);
sclError
(
"taosHashInit result map failed, num:%d"
,
SCL_DEFAULT_OP_NUM
);
SCL_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
ctx
.
udf2Handle
=
taosHashInit
(
SCL_DEFAULT_UDF_NUM
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_NO_LOCK
);
if
(
NULL
==
ctx
.
udf2Handle
)
{
sclError
(
"taosHashInit udf to handle map failed, num:%d"
,
SCL_DEFAULT_OP_NUM
);
SCL_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
nodesWalkExprPostOrder
(
pNode
,
sclCalcWalker
,
(
void
*
)
&
ctx
);
SCL_ERR_JRET
(
ctx
.
code
);
...
...
@@ -936,6 +963,7 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) {
_return:
//nodesDestroyNode(pNode);
sclFreeUdfHandles
(
ctx
.
udf2Handle
);
sclFreeRes
(
ctx
.
pRes
);
return
code
;
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录