09-udf.md 14.4 KB
Newer Older
1 2 3 4 5 6
---
sidebar_label: 用户定义函数
title: UDF(用户定义函数)
description: "支持用户编码的聚合函数和标量函数,在查询中嵌入并使用用户定义函数,拓展查询的能力和功能。"
---

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
7
在有些应用场景中,应用逻辑需要的查询无法直接使用系统内置的函数来表示。利用 UDF(User Defined Function) 功能,TDengine 可以插入用户编写的处理代码并在查询中使用它们,就能够很方便地解决特殊应用场景中的使用需求。 UDF 通常以数据表中的一列数据做为输入,同时支持以嵌套子查询的结果作为输入。
8

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
9
用户可以通过 UDF 实现两类函数:标量函数和聚合函数。标量函数对每行数据输出一个值,如求绝对值 abs,正弦函数 sin,字符串拼接函数 concat 等。聚合函数对多行数据进行输出一个值,如求平均数 avg,最大值 max 等。
S
shenglian zhou 已提交
10

S
shenglian zhou 已提交
11 12
TDengine 支持通过 C/Python 语言进行 UDF 定义。接下来结合示例讲解 UDF 的使用方法。

G
gccgdb1234 已提交
13
## 用 C 语言实现 UDF
S
shenglian zhou 已提交
14 15

使用 C 语言实现 UDF 时,需要实现规定的接口函数
S
shenglian zhou 已提交
16
- 标量函数需要实现标量接口函数 scalarfn 。
S
shenglian zhou 已提交
17
- 聚合函数需要实现聚合接口函数 aggfn_start , aggfn , aggfn_finish。
S
shenglian zhou 已提交
18
- 如果需要初始化,实现 udf_init;如果需要清理工作,实现udf_destroy。
S
shenglian zhou 已提交
19 20

接口函数的名称是 UDF 名称,或者是 UDF 名称和特定后缀(_start, _finish, _init, _destroy)的连接。列表中的scalarfn,aggfn, udf需要替换成udf函数名。
21

G
gccgdb1234 已提交
22
### 用 C 语言实现标量函数
S
shenglian zhou 已提交
23
标量函数实现模板如下
S
shenglian zhou 已提交
24 25 26 27
```c
#include "taos.h"
#include "taoserror.h"
#include "taosudf.h"
28

S
shenglian zhou 已提交
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
// initialization function. if no initialization, we can skip definition of it. The initialization function shall be concatenation of the udf name and _init suffix
// @return error number defined in taoserror.h
int32_t scalarfn_init() {
    // initialization.
    return TSDB_CODE_SUCCESS;
}

// scalar function main computation function
// @param inputDataBlock, input data block composed of multiple columns with each column defined by SUdfColumn
// @param resultColumn, output column
// @return error number defined in taoserror.h
int32_t scalarfn(SUdfDataBlock* inputDataBlock, SUdfColumn* resultColumn) {
    // read data from inputDataBlock and process, then output to resultColumn.
    return TSDB_CODE_SUCCESS;
}

// cleanup function. if no cleanup related processing, we can skip definition of it. The destroy function shall be concatenation of the udf name and _destroy suffix.
// @return error number defined in taoserror.h
int32_t scalarfn_destroy() {
    // clean up
    return TSDB_CODE_SUCCESS;
}
```
S
shenglian zhou 已提交
52 53
scalarfn 为函数名的占位符,需要替换成函数名,如bit_and。

G
gccgdb1234 已提交
54
### 用 C 语言实现聚合函数
S
shenglian zhou 已提交
55 56

聚合函数的实现模板如下
S
shenglian zhou 已提交
57 58 59 60 61 62 63 64 65 66 67 68 69
```c
#include "taos.h"
#include "taoserror.h"
#include "taosudf.h"

// Initialization function. if no initialization, we can skip definition of it. The initialization function shall be concatenation of the udf name and _init suffix
// @return error number defined in taoserror.h
int32_t aggfn_init() {
    // initialization.
    return TSDB_CODE_SUCCESS;
}

// aggregate start function. The intermediate value or the state(@interBuf) is initialized in this function. The function name shall be concatenation of udf name and _start suffix
sangshuduo's avatar
sangshuduo 已提交
70
// @param interbuf intermediate value to initialize
S
shenglian zhou 已提交
71 72
// @return error number defined in taoserror.h
int32_t aggfn_start(SUdfInterBuf* interBuf) {
S
shenglian zhou 已提交
73
    // initialize intermediate value in interBuf
sangshuduo's avatar
sangshuduo 已提交
74
    return TSDB_CODE_SUCCESS;
S
shenglian zhou 已提交
75 76 77
}

// aggregate reduce function. This function aggregate old state(@interbuf) and one data bock(inputBlock) and output a new state(@newInterBuf).
S
shenglian zhou 已提交
78 79 80 81
// @param inputBlock input data block
// @param interBuf old state
// @param newInterBuf new state
// @return error number defined in taoserror.h
S
shenglian zhou 已提交
82 83 84 85
int32_t aggfn(SUdfDataBlock* inputBlock, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) {
    // read from inputBlock and interBuf and output to newInterBuf
    return TSDB_CODE_SUCCESS;
}
86

S
shenglian zhou 已提交
87
// aggregate function finish function. This function transforms the intermediate value(@interBuf) into the final output(@result). The function name must be concatenation of aggfn and _finish suffix.
S
shenglian zhou 已提交
88 89
// @interBuf : intermediate value
// @result: final result
S
shenglian zhou 已提交
90 91
// @return error number defined in taoserror.h
int32_t int32_t aggfn_finish(SUdfInterBuf* interBuf, SUdfInterBuf *result) {
S
shenglian zhou 已提交
92
    // read data from inputDataBlock and process, then output to result
S
shenglian zhou 已提交
93 94 95 96 97 98 99 100 101 102
    return TSDB_CODE_SUCCESS;
}

// cleanup function. if no cleanup related processing, we can skip definition of it. The destroy function shall be concatenation of the udf name and _destroy suffix.
// @return error number defined in taoserror.h
int32_t aggfn_destroy() {
    // clean up
    return TSDB_CODE_SUCCESS;
}
```
S
shenglian zhou 已提交
103
aggfn为函数名的占位符,需要修改为自己的函数名,如l2norm。
S
shenglian zhou 已提交
104

G
gccgdb1234 已提交
105
### C 语言 UDF 接口函数定义
S
shenglian zhou 已提交
106

S
shenglian zhou 已提交
107
接口函数的名称是 udf 名称,或者是 udf 名称和特定后缀(_start, _finish, _init, _destroy)的连接。以下描述中函数名称中的 scalarfn,aggfn, udf 需要替换成udf函数名。
S
shenglian zhou 已提交
108

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
109
接口函数返回值表示是否成功。如果返回值是 TSDB_CODE_SUCCESS,表示操作成功,否则返回的是错误代码。错误代码定义在 taoserror.h,和 taos.h 中的API共享错误码的定义。例如, TSDB_CODE_UDF_INVALID_INPUT 表示输入无效输入。TSDB_CODE_OUT_OF_MEMORY 表示内存不足。
S
shenglian zhou 已提交
110 111

接口函数参数类型见数据结构定义。
S
shenglian zhou 已提交
112

G
gccgdb1234 已提交
113
#### 标量函数接口
114

S
shenglian zhou 已提交
115
 `int32_t scalarfn(SUdfDataBlock* inputDataBlock, SUdfColumn *resultColumn)` 
116
 
S
shenglian zhou 已提交
117
 其中 scalarFn 是函数名的占位符。这个函数对数据块进行标量计算,通过设置resultColumn结构体中的变量设置值
118

S
shenglian zhou 已提交
119
参数的具体含义是:
120
  - inputDataBlock: 输入的数据块
W
wade zhang 已提交
121
  - resultColumn: 输出列 
122

G
gccgdb1234 已提交
123
#### 聚合函数接口
124

S
shenglian zhou 已提交
125
`int32_t aggfn_start(SUdfInterBuf *interBuf)`
126

S
shenglian zhou 已提交
127
`int32_t aggfn(SUdfDataBlock* inputBlock, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf)`
128

S
shenglian zhou 已提交
129
`int32_t aggfn_finish(SUdfInterBuf* interBuf, SUdfInterBuf *result)`
130

S
shenglian zhou 已提交
131 132 133
其中 aggfn 是函数名的占位符。首先调用aggfn_start生成结果buffer,然后相关的数据会被分为多个行数据块,对每个数据块调用 aggfn 用数据块更新中间结果,最后再调用 aggfn_finish 从中间结果产生最终结果,最终结果只能含 0 或 1 条结果数据。

参数的具体含义是:
134 135 136 137
  - interBuf:中间结果 buffer。
  - inputBlock:输入的数据块。
  - newInterBuf:新的中间结果buffer。
  - result:最终结果。
138 139


G
gccgdb1234 已提交
140
#### 初始化和销毁接口
141
`int32_t udf_init()`
142

143
`int32_t udf_destroy()`
144

S
shenglian zhou 已提交
145
其中 udf 是函数名的占位符。udf_init 完成初始化工作。 udf_destroy 完成清理工作。如果没有初始化工作,无需定义udf_init函数。如果没有清理工作,无需定义udf_destroy函数。
146 147


G
gccgdb1234 已提交
148
### C 语言 UDF 数据结构
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
```c
typedef struct SUdfColumnMeta {
  int16_t type;
  int32_t bytes;
  uint8_t precision;
  uint8_t scale;
} SUdfColumnMeta;

typedef struct SUdfColumnData {
  int32_t numOfRows;
  int32_t rowsAlloc;
  union {
    struct {
      int32_t nullBitmapLen;
      char   *nullBitmap;
      int32_t dataLen;
      char   *data;
    } fixLenCol;

    struct {
      int32_t varOffsetsLen;
      int32_t   *varOffsets;
      int32_t payloadLen;
      char   *payload;
      int32_t payloadAllocLen;
    } varLenCol;
  };
} SUdfColumnData;

typedef struct SUdfColumn {
  SUdfColumnMeta colMeta;
  bool           hasNull;
  SUdfColumnData colData;
} SUdfColumn;

typedef struct SUdfDataBlock {
  int32_t numOfRows;
  int32_t numOfCols;
  SUdfColumn **udfCols;
} SUdfDataBlock;

typedef struct SUdfInterBuf {
  int32_t bufLen;
  char* buf;
  int8_t numOfResult; //zero or one
} SUdfInterBuf;
```
S
shenglian zhou 已提交
196 197 198
数据结构说明如下:

- SUdfDataBlock 数据块包含行数 numOfRows 和列数 numCols。udfCols[i] (0 <= i <= numCols-1)表示每一列数据,类型为SUdfColumn*
S
shenglian zhou 已提交
199
- SUdfColumn 包含列的数据类型定义 colMeta 和列的数据 colData。
S
shenglian zhou 已提交
200
- SUdfColumnMeta 成员定义同 taos.h 数据类型定义。
S
shenglian zhou 已提交
201 202
- SUdfColumnData 数据可以变长,varLenCol 定义变长数据,fixLenCol 定义定长数据。 
- SUdfInterBuf 定义中间结构 buffer,以及 buffer 中结果个数 numOfResult
203 204 205

为了更好的操作以上数据结构,提供了一些便利函数,定义在 taosudf.h。

G
gccgdb1234 已提交
206
### 编译 C UDF
207 208 209

用户定义函数的 C 语言源代码无法直接被 TDengine 系统使用,而是需要先编译为 动态链接库,之后才能载入 TDengine 系统。

210
例如,按照上一章节描述的规则准备好了用户定义函数的源代码 bit_and.c,以 Linux 为例可以执行如下指令编译得到动态链接库文件:
211 212

```bash
213
gcc -g -O0 -fPIC -shared bit_and.c -o libbitand.so
214 215
```

216
这样就准备好了动态链接库 libbitand.so 文件,可以供后文创建 UDF 时使用了。为了保证可靠的系统运行,编译器 GCC 推荐使用 7.5 及以上版本。
217

G
gccgdb1234 已提交
218
### C UDF 示例代码
219

A
Alex Duan 已提交
220
#### 标量函数示例 [bit_and](https://github.com/taosdata/TDengine/blob/3.0/tests/script/sh/bit_and.c)
221

S
shenglian zhou 已提交
222 223
bit_add 实现多列的按位与功能。如果只有一列,返回这一列。bit_add 忽略空值。

224
<details>
S
shenglian zhou 已提交
225
<summary>bit_and.c</summary>
226 227

```c
S
shenglian zhou 已提交
228
{{#include tests/script/sh/bit_and.c}}
229 230 231 232
```

</details>

A
Alex Duan 已提交
233
#### 聚合函数示例1 返回值为数值类型 [l2norm](https://github.com/taosdata/TDengine/blob/3.0/tests/script/sh/l2norm.c)
234

S
shenglian zhou 已提交
235
l2norm 实现了输入列的所有数据的二阶范数,即对每个数据先平方,再累加求和,最后开方。
S
shenglian zhou 已提交
236

237
<details>
S
shenglian zhou 已提交
238
<summary>l2norm.c</summary>
239 240

```c
S
shenglian zhou 已提交
241
{{#include tests/script/sh/l2norm.c}}
242 243 244
```

</details>
245

A
Alex Duan 已提交
246
#### 聚合函数示例2 返回值为字符串类型 [max_vol](https://github.com/taosdata/TDengine/blob/3.0/tests/script/sh/max_vol.c)
247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269

max_vol 实现了从多个输入的电压列中找到最大电压,返回由设备ID + 最大电压所在(行,列)+ 最大电压值 组成的组合字符串值

创建表:
```bash
create table battery(ts timestamp, vol1 float, vol2 float, vol3 float, deviceId varchar(16));
```
创建自定义函数:
```bash
create aggregate function max_vol as '/root/udf/libmaxvol.so' outputtype binary(64) bufsize 10240 language 'C'; 
```
使用自定义函数:
```bash
select max_vol(vol1,vol2,vol3,deviceid) from battery;
```

<details>
<summary>max_vol.c</summary>

```c
{{#include tests/script/sh/max_vol.c}}
```

S
shenglian zhou 已提交
270 271
</details>

G
gccgdb1234 已提交
272 273
## 用 Python 语言实现 UDF

S
shenglian zhou 已提交
274 275 276 277 278
使用 Python 语言实现 UDF 时,需要实现规定的接口函数
- 标量函数需要实现标量接口函数 process 。
- 聚合函数需要实现聚合接口函数 start ,reduce ,finish。
- 如果需要初始化,实现 init;如果需要清理工作,实现 destroy。

G
gccgdb1234 已提交
279
### 用 Python 实现标量函数
S
shenglian zhou 已提交
280 281 282 283 284 285 286 287 288 289 290 291 292

标量函数实现模版如下
```Python
def init():
    # initialization
def destroy():
    # destroy
def process(input: datablock) -> tuple[output_type]:
    # process input datablock, 
    # datablock.data(row, col) is to access the python object in location(row,col)
    # return tuple object consisted of object of type outputtype   
```

G
gccgdb1234 已提交
293
### 用 Python 实现聚合函数
S
shenglian zhou 已提交
294 295 296 297 298 299 300 301 302 303 304 305

聚合函数实现模版如下
```Python
def init():
    #initialization
def destroy():
    #destroy
def start() -> bytes:
    #return serialize(init_state)
def reduce(inputs: datablock, buf: bytes) -> bytes
    # deserialize buf to state
    # reduce the inputs and state into new_state. 
sangshuduo's avatar
sangshuduo 已提交
306
    # use inputs.data(i,j) to access python object of location(i,j)
S
shenglian zhou 已提交
307 308 309 310 311 312
    # serialize new_state into new_state_bytes
    return new_state_bytes   
def finish(buf: bytes) -> output_type:
    #return obj of type outputtype   
```

G
gccgdb1234 已提交
313
### Python UDF 接口函数定义
S
shenglian zhou 已提交
314

G
gccgdb1234 已提交
315
#### 标量函数接口
S
shenglian zhou 已提交
316 317 318 319 320 321
```Python
def process(input: datablock) -> tuple[output_type]:
```
- input:datablock 类似二维矩阵,通过成员方法 data(row,col)返回位于 row 行,col 列的 python 对象
- 返回值是一个 Python 对象元组,每个元素类型为输出类型。

G
gccgdb1234 已提交
322
#### 聚合函数接口
S
shenglian zhou 已提交
323 324 325 326 327 328 329 330 331
```Python
def start() -> bytes:
def reduce(inputs: datablock, buf: bytes) -> bytes
def finish(buf: bytes) -> output_type:
```

首先调用 start 生成最初结果 buffer,然后输入数据会被分为多个行数据块,对每个数据块 inputs 和当前中间结果 buf 调用 reduce,得到新的中间结果,最后再调用 finish 从中间结果 buf 产生最终输出,最终输出只能含 0 或 1 条数据。


G
gccgdb1234 已提交
332
#### 初始化和销毁接口
S
shenglian zhou 已提交
333 334 335 336 337
```Python
def init()
def destroy()
```

S
shenglian zhou 已提交
338
其中 init 完成初始化工作。 destroy 完成清理工作。
S
shenglian zhou 已提交
339

G
gccgdb1234 已提交
340
### Python 和 TDengine之间的数据类型映射
341 342 343

下表描述了TDengine SQL数据类型和Python数据类型的映射。任何类型的NULL值都映射成Python的None值。

S
shenglian zhou 已提交
344 345 346 347 348 349 350 351 352 353
|  **TDengine SQL数据类型**   | **Python数据类型** |
| :-----------------------: | ------------ |
|TINYINT / SMALLINT / INT  / BIGINT     | int   |
|TINYINT UNSIGNED / SMALLINT UNSIGNED / INT UNSIGNED / BIGINT UNSIGNED | int |
|FLOAT / DOUBLE | float |
|BOOL | bool |
|BINARY / VARCHAR / NCHAR | bytes|
|TIMESTAMP | int |
|JSON and other types | 不支持 |

G
gccgdb1234 已提交
354
### Python UDF 环境的安装
S
shenglian zhou 已提交
355 356
1. 安装 taospyudf 包。此包执行Python UDF程序。
```bash
357 358
sudo pip install taospyudf
ldconfig
S
shenglian zhou 已提交
359
```
S
slzhou 已提交
360
2. 如果 Python UDF 程序执行时,通过 PYTHONPATH 引用其它的包,可以设置 taos.cfg 的 UdfdLdLibPath 变量为PYTHONPATH的内容
S
shenglian zhou 已提交
361
 
G
gccgdb1234 已提交
362
### Python UDF 示例代码
363
#### 标量函数示例 [pybitand](https://github.com/taosdata/TDengine/blob/3.0/tests/script/sh/pybitand.py)
S
shenglian zhou 已提交
364

S
slzhou 已提交
365
pybitand 实现多列的按位与功能。如果只有一列,返回这一列。pybitand 忽略空值。
S
shenglian zhou 已提交
366 367 368 369 370 371 372 373 374 375

<details>
<summary>pybitand.py</summary>

```Python
{{#include tests/script/sh/pybitand.py}}
```

</details>

G
gccgdb1234 已提交
376
#### 聚合函数示例 [pyl2norm](https://github.com/taosdata/TDengine/blob/3.0/tests/script/sh/pyl2norm.py)
S
shenglian zhou 已提交
377 378 379 380 381 382 383 384 385 386 387 388

pyl2norm 实现了输入列的所有数据的二阶范数,即对每个数据先平方,再累加求和,最后开方。

<details>
<summary>pyl2norm.py</summary>

```c
{{#include tests/script/sh/pyl2norm.py}}
```

</details>

S
shenglian zhou 已提交
389 390 391 392 393 394 395 396 397 398 399
### 聚合函数示例 [pycumsum](https://github.com/taosdata/TDengine/blob/3.0/tests/script/sh/pycumsum.py)

pycumsum 使用 numpy 计算输入列所有数据的累积和。
<details>
<summary>pycumsum.py</summary>

```c
{{#include tests/script/sh/pycumsum.py}}
```

</details>
G
gccgdb1234 已提交
400
## 管理和使用 UDF
401 402
在使用 UDF 之前需要先将其加入到 TDengine 系统中。关于如何管理和使用 UDF,请参考[管理和使用 UDF](../12-taos-sql/26-udf.md)