--- sidebar_label: 用户定义函数 title: UDF(用户定义函数) description: "支持用户编码的聚合函数和标量函数,在查询中嵌入并使用用户定义函数,拓展查询的能力和功能。" --- 在有些应用场景中,应用逻辑需要的查询无法直接使用系统内置的函数来表示。利用 UDF(User Defined Function) 功能,TDengine 可以插入用户编写的处理代码并在查询中使用它们,就能够很方便地解决特殊应用场景中的使用需求。 UDF 通常以数据表中的一列数据做为输入,同时支持以嵌套子查询的结果作为输入。 TDengine 支持通过 C/C++ 语言进行 UDF 定义。接下来结合示例讲解 UDF 的使用方法。 用户可以通过 UDF 实现两类函数:标量函数和聚合函数。标量函数对每行数据输出一个值,如求绝对值 abs,正弦函数 sin,字符串拼接函数 concat 等。聚合函数对多行数据进行输出一个值,如求平均数 avg,最大值 max 等。 实现 UDF 时,需要实现规定的接口函数 - 标量函数需要实现标量接口函数 scalarfn 。 - 聚合函数需要实现聚合接口函数 aggfn_start , aggfn , aggfn_finish。 - 如果需要初始化,实现 udf_init;如果需要清理工作,实现udf_destroy。 接口函数的名称是 UDF 名称,或者是 UDF 名称和特定后缀(_start, _finish, _init, _destroy)的连接。列表中的scalarfn,aggfn, udf需要替换成udf函数名。 ## 实现标量函数 标量函数实现模板如下 ```c #include "taos.h" #include "taoserror.h" #include "taosudf.h" // initialization function. if no initialization, we can skip definition of it. The initialization function shall be concatenation of the udf name and _init suffix // @return error number defined in taoserror.h int32_t scalarfn_init() { // initialization. return TSDB_CODE_SUCCESS; } // scalar function main computation function // @param inputDataBlock, input data block composed of multiple columns with each column defined by SUdfColumn // @param resultColumn, output column // @return error number defined in taoserror.h int32_t scalarfn(SUdfDataBlock* inputDataBlock, SUdfColumn* resultColumn) { // read data from inputDataBlock and process, then output to resultColumn. return TSDB_CODE_SUCCESS; } // cleanup function. if no cleanup related processing, we can skip definition of it. The destroy function shall be concatenation of the udf name and _destroy suffix. // @return error number defined in taoserror.h int32_t scalarfn_destroy() { // clean up return TSDB_CODE_SUCCESS; } ``` scalarfn 为函数名的占位符,需要替换成函数名,如bit_and。 ## 实现聚合函数 聚合函数的实现模板如下 ```c #include "taos.h" #include "taoserror.h" #include "taosudf.h" // Initialization function. if no initialization, we can skip definition of it. The initialization function shall be concatenation of the udf name and _init suffix // @return error number defined in taoserror.h int32_t aggfn_init() { // initialization. return TSDB_CODE_SUCCESS; } // aggregate start function. The intermediate value or the state(@interBuf) is initialized in this function. The function name shall be concatenation of udf name and _start suffix // @param interbuf intermediate value to initialize // @return error number defined in taoserror.h int32_t aggfn_start(SUdfInterBuf* interBuf) { // initialize intermediate value in interBuf return TSDB_CODE_SUCCESS; } // aggregate reduce function. This function aggregate old state(@interbuf) and one data bock(inputBlock) and output a new state(@newInterBuf). // @param inputBlock input data block // @param interBuf old state // @param newInterBuf new state // @return error number defined in taoserror.h int32_t aggfn(SUdfDataBlock* inputBlock, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) { // read from inputBlock and interBuf and output to newInterBuf return TSDB_CODE_SUCCESS; } // aggregate function finish function. This function transforms the intermediate value(@interBuf) into the final output(@result). The function name must be concatenation of aggfn and _finish suffix. // @interBuf : intermediate value // @result: final result // @return error number defined in taoserror.h int32_t int32_t aggfn_finish(SUdfInterBuf* interBuf, SUdfInterBuf *result) { // read data from inputDataBlock and process, then output to result return TSDB_CODE_SUCCESS; } // cleanup function. if no cleanup related processing, we can skip definition of it. The destroy function shall be concatenation of the udf name and _destroy suffix. // @return error number defined in taoserror.h int32_t aggfn_destroy() { // clean up return TSDB_CODE_SUCCESS; } ``` aggfn为函数名的占位符,需要修改为自己的函数名,如l2norm。 ## 接口函数定义 接口函数的名称是 udf 名称,或者是 udf 名称和特定后缀(_start, _finish, _init, _destroy)的连接。以下描述中函数名称中的 scalarfn,aggfn, udf 需要替换成udf函数名。 接口函数返回值表示是否成功。如果返回值是 TSDB_CODE_SUCCESS,表示操作成功,否则返回的是错误代码。错误代码定义在 taoserror.h,和 taos.h 中的API共享错误码的定义。例如, TSDB_CODE_UDF_INVALID_INPUT 表示输入无效输入。TSDB_CODE_OUT_OF_MEMORY 表示内存不足。 接口函数参数类型见数据结构定义。 ### 标量接口函数 `int32_t scalarfn(SUdfDataBlock* inputDataBlock, SUdfColumn *resultColumn)` 其中 scalarFn 是函数名的占位符。这个函数对数据块进行标量计算,通过设置resultColumn结构体中的变量设置值 参数的具体含义是: - inputDataBlock: 输入的数据块 - resultColumn: 输出列 ### 聚合接口函数 `int32_t aggfn_start(SUdfInterBuf *interBuf)` `int32_t aggfn(SUdfDataBlock* inputBlock, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf)` `int32_t aggfn_finish(SUdfInterBuf* interBuf, SUdfInterBuf *result)` 其中 aggfn 是函数名的占位符。首先调用aggfn_start生成结果buffer,然后相关的数据会被分为多个行数据块,对每个数据块调用 aggfn 用数据块更新中间结果,最后再调用 aggfn_finish 从中间结果产生最终结果,最终结果只能含 0 或 1 条结果数据。 参数的具体含义是: - interBuf:中间结果 buffer。 - inputBlock:输入的数据块。 - newInterBuf:新的中间结果buffer。 - result:最终结果。 ### UDF 初始化和销毁 `int32_t udf_init()` `int32_t udf_destroy()` 其中 udf 是函数名的占位符。udf_init 完成初始化工作。 udf_destroy 完成清理工作。如果没有初始化工作,无需定义udf_init函数。如果没有清理工作,无需定义udf_destroy函数。 ## UDF 数据结构 ```c typedef struct SUdfColumnMeta { int16_t type; int32_t bytes; uint8_t precision; uint8_t scale; } SUdfColumnMeta; typedef struct SUdfColumnData { int32_t numOfRows; int32_t rowsAlloc; union { struct { int32_t nullBitmapLen; char *nullBitmap; int32_t dataLen; char *data; } fixLenCol; struct { int32_t varOffsetsLen; int32_t *varOffsets; int32_t payloadLen; char *payload; int32_t payloadAllocLen; } varLenCol; }; } SUdfColumnData; typedef struct SUdfColumn { SUdfColumnMeta colMeta; bool hasNull; SUdfColumnData colData; } SUdfColumn; typedef struct SUdfDataBlock { int32_t numOfRows; int32_t numOfCols; SUdfColumn **udfCols; } SUdfDataBlock; typedef struct SUdfInterBuf { int32_t bufLen; char* buf; int8_t numOfResult; //zero or one } SUdfInterBuf; ``` 数据结构说明如下: - SUdfDataBlock 数据块包含行数 numOfRows 和列数 numCols。udfCols[i] (0 <= i <= numCols-1)表示每一列数据,类型为SUdfColumn*。 - SUdfColumn 包含列的数据类型定义 colMeta 和列的数据 colData。 - SUdfColumnMeta 成员定义同 taos.h 数据类型定义。 - SUdfColumnData 数据可以变长,varLenCol 定义变长数据,fixLenCol 定义定长数据。 - SUdfInterBuf 定义中间结构 buffer,以及 buffer 中结果个数 numOfResult 为了更好的操作以上数据结构,提供了一些便利函数,定义在 taosudf.h。 ## 编译 UDF 用户定义函数的 C 语言源代码无法直接被 TDengine 系统使用,而是需要先编译为 动态链接库,之后才能载入 TDengine 系统。 例如,按照上一章节描述的规则准备好了用户定义函数的源代码 bit_and.c,以 Linux 为例可以执行如下指令编译得到动态链接库文件: ```bash gcc -g -O0 -fPIC -shared bit_and.c -o libbitand.so ``` 这样就准备好了动态链接库 libbitand.so 文件,可以供后文创建 UDF 时使用了。为了保证可靠的系统运行,编译器 GCC 推荐使用 7.5 及以上版本。 ## 管理和使用UDF 编译好的UDF,还需要将其加入到系统才能被正常的SQL调用。关于如何管理和使用UDF,参见[UDF使用说明](../12-taos-sql/26-udf.md) ## 示例代码 ### 标量函数示例 [bit_and](https://github.com/taosdata/TDengine/blob/develop/tests/script/sh/bit_and.c) bit_add 实现多列的按位与功能。如果只有一列,返回这一列。bit_add 忽略空值。
bit_and.c ```c {{#include tests/script/sh/bit_and.c}} ```
### 聚合函数示例1 返回值为数值类型 [l2norm](https://github.com/taosdata/TDengine/blob/develop/tests/script/sh/l2norm.c) l2norm 实现了输入列的所有数据的二阶范数,即对每个数据先平方,再累加求和,最后开方。
l2norm.c ```c {{#include tests/script/sh/l2norm.c}} ```
### 聚合函数示例2 返回值为字符串类型 [max_vol](https://github.com/taosdata/TDengine/blob/develop/tests/script/sh/max_vol.c) max_vol 实现了从多个输入的电压列中找到最大电压,返回由设备ID + 最大电压所在(行,列)+ 最大电压值 组成的组合字符串值 创建表: ```bash create table battery(ts timestamp, vol1 float, vol2 float, vol3 float, deviceId varchar(16)); ``` 创建自定义函数: ```bash create aggregate function max_vol as '/root/udf/libmaxvol.so' outputtype binary(64) bufsize 10240 language 'C'; ``` 使用自定义函数: ```bash select max_vol(vol1,vol2,vol3,deviceid) from battery; ```
max_vol.c ```c {{#include tests/script/sh/max_vol.c}} ```