Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
2f27d015
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
2f27d015
编写于
8月 04, 2022
作者:
S
shenglian zhou
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix: get result row add forUpdate parameter
上级
ecb695e8
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
80 addition
and
54 deletion
+80
-54
docs/zh/07-develop/09-udf.md
docs/zh/07-develop/09-udf.md
+70
-46
source/libs/executor/inc/executil.h
source/libs/executor/inc/executil.h
+4
-2
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+1
-1
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+2
-2
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+3
-3
未找到文件。
docs/zh/07-develop/09-udf.md
浏览文件 @
2f27d015
...
...
@@ -16,72 +16,96 @@ description: "支持用户编码的聚合函数和标量函数,在查询中嵌
用户可以按照下列函数模板定义自己的标量计算函数
`
void udfNormalFunc(char* data, short itype, short ibytes, int numOfRows, long long* ts, char* dataOutput, char* interBuf, char* tsOutput, int* numOfOutput, short otype, short obytes, SUdfInit* buf
)`
`
int32_t udf(SUdfDataBlock* inputDataBlock, SUdfColumn *resultColumn
)`
其中 udfNormalFunc 是函数名的占位符,以上述模板实现的函数对行数据块进行标量计算,其参数项是固定的,用于按照约束完成与引擎之间的数据交换。
-
udfNormalFunc 中各参数的具体含义是:
-
data:输入数据。
-
itype:输入数据的类型。这里采用的是短整型表示法,与各种数据类型对应的值可以参见
[
column_meta 中的列类型说明
](
/reference/rest-api/
)
。例如 4 用于表示 INT 型。
-
iBytes:输入数据中每个值会占用的字节数。
-
numOfRows:输入数据的总行数。
-
ts:主键时间戳在输入中的列数据(只读)。
-
dataOutput:输出数据的缓冲区,缓冲区大小为用户指定的输出类型大小
\*
numOfRows。
-
interBuf:中间计算结果的缓冲区,大小为用户在创建 UDF 时指定的 BUFSIZE 大小。通常用于计算中间结果与最终结果不一致时使用,由引擎负责分配与释放。
-
tsOutput:主键时间戳在输出时的列数据,如果非空可用于输出结果对应的时间戳。
-
numOfOutput:输出结果的个数(行数)。
-
oType:输出数据的类型。取值含义与 itype 参数一致。
-
oBytes:输出数据中每个值占用的字节数。
-
buf:用于在 UDF 与引擎间的状态控制信息传递块。
[
add_one.c
](
https://github.com/taosdata/TDengine/blob/develop/tests/script/sh/add_one.c
)
是结构最简单的 UDF 实现,也即上面定义的 udfNormalFunc 函数的一个具体实现。其功能为:对传入的一个数据列(可能因 WHERE 子句进行了筛选)中的每一项,都输出 +1 之后的值,并且要求输入的列数据类型为 INT。
其中 udf 是函数名的占位符,以上述模板实现的函数对行数据块进行标量计算。
-
scalarFunction 中各参数的具体含义是:
-
inputDataBlock: 输入的数据块
-
resultColumn: 输出列
### 聚合函数
用户可以按照如下函数模板定义自己的聚合函数。
`void abs_max_merge(char* data, int32_t numOfRows, char* dataOutput, int32_t* numOfOutput, SUdfInit* buf)`
其中 udfMergeFunc 是函数名的占位符,以上述模板实现的函数用于对计算中间结果进行聚合,只有针对超级表的聚合查询才需要调用该函数。其中各参数的具体含义是:
-
data:udfNormalFunc 的输出数据数组,如果使用了 interBuf 那么 data 就是 interBuf 的数组。
-
numOfRows:data 中数据的行数。
-
dataOutput:输出数据的缓冲区,大小等于一条最终结果的大小。如果此时输出还不是最终结果,可以选择输出到 interBuf 中即 data 中。
-
numOfOutput:输出结果的个数(行数)。
-
buf:用于在 UDF 与引擎间的状态控制信息传递块。
[
abs_max.c
](
https://github.com/taosdata/TDengine/blob/develop/tests/script/sh/abs_max.c
)
实现的是一个聚合函数,功能是对一组数据按绝对值取最大值。
其计算过程为:与所在查询语句相关的数据会被分为多个行数据块,对每个行数据块调用 udfNormalFunc(在本例的实现代码中,实际函数名是
`abs_max`
)来生成每个子表的中间结果,再将子表的中间结果调用 udfMergeFunc(本例中,其实际的函数名是
`abs_max_merge`
)进行聚合,生成超级表的最终聚合结果或中间结果。聚合查询最后还会通过 udfFinalizeFunc(本例中,其实际的函数名是
`abs_max_finalize`
)再把超级表的中间结果处理为最终结果,最终结果只能含 0 或 1 条结果数据。
`int32_t udf_start(SUdfInterBuf *interBuf)`
其他典型场景,如协方差的计算,也可通过定义聚合 UDF 的方式实现。
`int32_t udf(SUdfDataBlock* inputBlock, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf)`
### 最终计算
`int32_t udf_finish(SUdfInterBuf* interBuf, SUdfInterBuf *result)`
其中 udf 是函数名的占位符。其中各参数的具体含义是:
用户可以按下面的函数模板实现自己的函数对计算结果进行最终计算,通常用于有 interBuf 使用的场景。
-
interBuf:中间结果 buffer。
-
inputBlock:输入的数据块。
-
newInterBuf:新的中间结果buffer。
-
result:最终结果。
`void abs_max_finalize(char* dataOutput, char* interBuf, int* numOfOutput, SUdfInit* buf)`
其中 udfFinalizeFunc 是函数名的占位符 ,其中各参数的具体含义是:
-
dataOutput:输出数据的缓冲区。
-
interBuf:中间结算结果缓冲区,可作为输入。
-
numOfOutput:输出数据的个数,对聚合函数来说只能是 0 或者 1。
-
buf:用于在 UDF 与引擎间的状态控制信息传递块。
其计算过程为:首先调用udf_start生成结果buffer,然后相关的数据会被分为多个行数据块,对每个行数据块调用 udf 用数据块更新中间结果,最后再调用 udf_finish 从中间结果产生最终结果,最终结果只能含 0 或 1 条结果数据。
## UDF 实现方式的规则总结
### UDF 初始化和销毁
`int32_t udf_init()`
三类 UDF 函数: udfNormalFunc、udfMergeFunc、udfFinalizeFunc ,其函数名约定使用相同的前缀,此前缀即 udfNormalFunc 的实际函数名,也即 udfNormalFunc 函数不需要在实际函数名后添加后缀;而udfMergeFunc 的函数名要加上后缀
`_merge`
、udfFinalizeFunc 的函数名要加上后缀
`_finalize`
,这是 UDF 实现规则的一部分,系统会按照这些函数名后缀来调用相应功能。
`int32_t udf_destroy()`
根据 UDF 函数类型的不同,用户所要实现的功能函数也不同:
-
标量函数:UDF 中需实现 udfNormalFunc。
-
聚合函数:UDF 中需实现 udfNormalFunc、udfMergeFunc(对超级表查询)、udfFinalizeFunc。
其中 udf 是函数名的占位符。udf_init 完成初始化工作。 udf_destroy 完成清理工作。
:::note
如果对应的函数不需要具体的功能,也需要实现一个空函数。
:::
### UDF 数据结构
```
c
typedef
struct
SUdfColumnMeta
{
int16_t
type
;
int32_t
bytes
;
uint8_t
precision
;
uint8_t
scale
;
}
SUdfColumnMeta
;
typedef
struct
SUdfColumnData
{
int32_t
numOfRows
;
int32_t
rowsAlloc
;
union
{
struct
{
int32_t
nullBitmapLen
;
char
*
nullBitmap
;
int32_t
dataLen
;
char
*
data
;
}
fixLenCol
;
struct
{
int32_t
varOffsetsLen
;
int32_t
*
varOffsets
;
int32_t
payloadLen
;
char
*
payload
;
int32_t
payloadAllocLen
;
}
varLenCol
;
};
}
SUdfColumnData
;
typedef
struct
SUdfColumn
{
SUdfColumnMeta
colMeta
;
bool
hasNull
;
SUdfColumnData
colData
;
}
SUdfColumn
;
typedef
struct
SUdfDataBlock
{
int32_t
numOfRows
;
int32_t
numOfCols
;
SUdfColumn
**
udfCols
;
}
SUdfDataBlock
;
typedef
struct
SUdfInterBuf
{
int32_t
bufLen
;
char
*
buf
;
int8_t
numOfResult
;
//zero or one
}
SUdfInterBuf
;
```
为了更好的操作以上数据结构,提供了一些便利函数,定义在 taosudf.h。
## 编译 UDF
用户定义函数的 C 语言源代码无法直接被 TDengine 系统使用,而是需要先编译为 动态链接库,之后才能载入 TDengine 系统。
...
...
source/libs/executor/inc/executil.h
浏览文件 @
2f27d015
...
...
@@ -88,9 +88,11 @@ bool isResultRowClosed(SResultRow* pResultRow);
struct
SResultRowEntryInfo
*
getResultEntryInfo
(
const
SResultRow
*
pRow
,
int32_t
index
,
const
int32_t
*
offset
);
static
FORCE_INLINE
SResultRow
*
getResultRowByPos
(
SDiskbasedBuf
*
pBuf
,
SResultRowPosition
*
pos
)
{
static
FORCE_INLINE
SResultRow
*
getResultRowByPos
(
SDiskbasedBuf
*
pBuf
,
SResultRowPosition
*
pos
,
bool
forUpdate
)
{
SFilePage
*
bufPage
=
(
SFilePage
*
)
getBufPage
(
pBuf
,
pos
->
pageId
);
setBufPageDirty
(
bufPage
,
true
);
if
(
forUpdate
)
{
setBufPageDirty
(
bufPage
,
true
);
}
SResultRow
*
pRow
=
(
SResultRow
*
)((
char
*
)
bufPage
+
pos
->
offset
);
return
pRow
;
}
...
...
source/libs/executor/src/executil.c
浏览文件 @
2f27d015
...
...
@@ -953,7 +953,7 @@ STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowI
return
w
;
}
w
=
getResultRowByPos
(
pBuf
,
&
pResultRowInfo
->
cur
)
->
win
;
w
=
getResultRowByPos
(
pBuf
,
&
pResultRowInfo
->
cur
,
false
)
->
win
;
// in case of typical time window, we can calculate time window directly.
if
(
w
.
skey
>
ts
||
w
.
ekey
<
ts
)
{
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
2f27d015
...
...
@@ -258,7 +258,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
// in case of repeat scan/reverse scan, no new time window added.
if
(
isIntervalQuery
)
{
if
(
masterscan
&&
p1
!=
NULL
)
{
// the *p1 may be NULL in case of sliding+offset exists.
pResult
=
getResultRowByPos
(
pResultBuf
,
p1
);
pResult
=
getResultRowByPos
(
pResultBuf
,
p1
,
true
);
ASSERT
(
pResult
->
pageId
==
p1
->
pageId
&&
pResult
->
offset
==
p1
->
offset
);
}
}
else
{
...
...
@@ -266,7 +266,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
// pResultRowInfo object.
if
(
p1
!=
NULL
)
{
// todo
pResult
=
getResultRowByPos
(
pResultBuf
,
p1
);
pResult
=
getResultRowByPos
(
pResultBuf
,
p1
,
true
);
ASSERT
(
pResult
->
pageId
==
p1
->
pageId
&&
pResult
->
offset
==
p1
->
offset
);
}
}
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
2f27d015
...
...
@@ -611,7 +611,7 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
break
;
}
SResultRow
*
pr
=
getResultRowByPos
(
pInfo
->
aggSup
.
pResultBuf
,
p1
);
SResultRow
*
pr
=
getResultRowByPos
(
pInfo
->
aggSup
.
pResultBuf
,
p1
,
false
);
ASSERT
(
pr
->
offset
==
p1
->
offset
&&
pr
->
pageId
==
p1
->
pageId
);
if
(
pr
->
closed
)
{
...
...
@@ -1345,7 +1345,7 @@ static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type
}
void
doClearWindowImpl
(
SResultRowPosition
*
p1
,
SDiskbasedBuf
*
pResultBuf
,
SExprSupp
*
pSup
,
int32_t
numOfOutput
)
{
SResultRow
*
pResult
=
getResultRowByPos
(
pResultBuf
,
p1
);
SResultRow
*
pResult
=
getResultRowByPos
(
pResultBuf
,
p1
,
false
);
SqlFunctionCtx
*
pCtx
=
pSup
->
pCtx
;
for
(
int32_t
i
=
0
;
i
<
numOfOutput
;
++
i
)
{
pCtx
[
i
].
resultInfo
=
getResultEntryInfo
(
pResult
,
i
,
pSup
->
rowEntryInfoOffset
);
...
...
@@ -3481,7 +3481,7 @@ static int32_t setWindowOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pRes
pWinInfo
->
pos
.
pageId
=
(
*
pResult
)
->
pageId
;
pWinInfo
->
pos
.
offset
=
(
*
pResult
)
->
offset
;
}
else
{
*
pResult
=
getResultRowByPos
(
pAggSup
->
pResultBuf
,
&
pWinInfo
->
pos
);
*
pResult
=
getResultRowByPos
(
pAggSup
->
pResultBuf
,
&
pWinInfo
->
pos
,
true
);
if
(
!
(
*
pResult
))
{
qError
(
"getResultRowByPos return NULL, TID:%s"
,
GET_TASKID
(
pTaskInfo
));
return
TSDB_CODE_FAILED
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录