04-udf.md 18.0 KB
Newer Older
B
Bo Ding 已提交
1
---
B
Bo Ding 已提交
2
sidebar_label: 用户定义函数
D
dingbo 已提交
3
title: UDF(用户定义函数)
B
Bo Ding 已提交
4 5 6 7 8 9
---

在有些应用场景中,应用逻辑需要的查询无法直接使用系统内置的函数来表示。利用 UDF 功能,TDengine 可以插入用户编写的处理代码并在查询中使用它们,就能够很方便地解决特殊应用场景中的使用需求。 UDF 通常以数据表中的一列数据做为输入,同时支持以嵌套子查询的结果作为输入。

从 2.2.0.0 版本开始,TDengine 支持通过 C/C++ 语言进行 UDF 定义。接下来结合示例讲解 UDF 的使用方法。

W
wade zhang 已提交
10
用户可以通过 UDF 实现两类函数: 标量函数 和 聚合函数。
B
Bo Ding 已提交
11

W
wade zhang 已提交
12
## 用 C/C++ 语言来定义 UDF
B
Bo Ding 已提交
13 14 15

### 标量函数

W
wade zhang 已提交
16
用户可以按照下列函数模板定义自己的标量计算函数
B
Bo Ding 已提交
17

W
wade zhang 已提交
18 19 20
 `void udfNormalFunc(char* data, short itype, short ibytes, int numOfRows, long long* ts, char* dataOutput, char* interBuf, char* tsOutput, int* numOfOutput, short otype, short obytes, SUdfInit* buf)` 
 
 其中 udfNormalFunc 是函数名的占位符,以上述模板实现的函数对行数据块进行标量计算,其参数项是固定的,用于按照约束完成与引擎之间的数据交换。
B
Bo Ding 已提交
21 22 23

- udfNormalFunc 中各参数的具体含义是:
  - data:输入数据。
24
  - itype:输入数据的类型。这里采用的是短整型表示法,与各种数据类型对应的值可以参见 [column_meta 中的列类型说明](/reference/restful-api/)。例如 4 用于表示 INT 型。
B
Bo Ding 已提交
25 26 27 28 29 30 31 32 33 34 35
  - iBytes:输入数据中每个值会占用的字节数。
  - numOfRows:输入数据的总行数。
  - ts:主键时间戳在输入中的列数据(只读)。
  - dataOutput:输出数据的缓冲区,缓冲区大小为用户指定的输出类型大小 \* numOfRows。
  - interBuf:中间计算结果的缓冲区,大小为用户在创建 UDF 时指定的 BUFSIZE 大小。通常用于计算中间结果与最终结果不一致时使用,由引擎负责分配与释放。
  - tsOutput:主键时间戳在输出时的列数据,如果非空可用于输出结果对应的时间戳。
  - numOfOutput:输出结果的个数(行数)。
  - oType:输出数据的类型。取值含义与 itype 参数一致。
  - oBytes:输出数据中每个值占用的字节数。
  - buf:用于在 UDF 与引擎间的状态控制信息传递块。

W
wade zhang 已提交
36
  [add_one.c](https://github.com/taosdata/TDengine/blob/develop/tests/script/sh/add_one.c) 是结构最简单的 UDF 实现,也即上面定义的 udfNormalFunc 函数的一个具体实现。其功能为:对传入的一个数据列(可能因 WHERE 子句进行了筛选)中的每一项,都输出 +1 之后的值,并且要求输入的列数据类型为 INT。
B
Bo Ding 已提交
37

W
wade zhang 已提交
38
### 聚合函数
B
Bo Ding 已提交
39

W
wade zhang 已提交
40
用户可以按照如下函数模板定义自己的聚合函数。
B
Bo Ding 已提交
41

W
wade zhang 已提交
42
`void abs_max_merge(char* data, int32_t numOfRows, char* dataOutput, int32_t* numOfOutput, SUdfInit* buf)`
B
Bo Ding 已提交
43

W
wade zhang 已提交
44
其中 udfMergeFunc 是函数名的占位符,以上述模板实现的函数用于对计算中间结果进行聚合,只有针对超级表的聚合查询才需要调用该函数。其中各参数的具体含义是:
B
Bo Ding 已提交
45 46 47 48 49 50 51

  - data:udfNormalFunc 的输出数据数组,如果使用了 interBuf 那么 data 就是 interBuf 的数组。
  - numOfRows:data 中数据的行数。
  - dataOutput:输出数据的缓冲区,大小等于一条最终结果的大小。如果此时输出还不是最终结果,可以选择输出到 interBuf 中即 data 中。
  - numOfOutput:输出结果的个数(行数)。
  - buf:用于在 UDF 与引擎间的状态控制信息传递块。

W
wade zhang 已提交
52
[abs_max.c](https://github.com/taosdata/TDengine/blob/develop/tests/script/sh/abs_max.c) 实现的是一个聚合函数,功能是对一组数据按绝对值取最大值。
B
Bo Ding 已提交
53

W
wade zhang 已提交
54
其计算过程为:与所在查询语句相关的数据会被分为多个行数据块,对每个行数据块调用 udfNormalFunc(在本例的实现代码中,实际函数名是 `abs_max`)来生成每个子表的中间结果,再将子表的中间结果调用 udfMergeFunc(本例中,其实际的函数名是 `abs_max_merge`)进行聚合,生成超级表的最终聚合结果或中间结果。聚合查询最后还会通过 udfFinalizeFunc(本例中,其实际的函数名是 `abs_max_finalize`)再把超级表的中间结果处理为最终结果,最终结果只能含 0 或 1 条结果数据。
B
Bo Ding 已提交
55

W
wade zhang 已提交
56
其他典型场景,如协方差的计算,也可通过定义聚合 UDF 的方式实现。
B
Bo Ding 已提交
57

W
wade zhang 已提交
58
### 最终计算
B
Bo Ding 已提交
59

W
wade zhang 已提交
60
用户可以按下面的函数模板实现自己的函数对计算结果进行最终计算,通常用于有 interBuf 使用的场景。
B
Bo Ding 已提交
61

W
wade zhang 已提交
62
`void abs_max_finalize(char* dataOutput, char* interBuf, int* numOfOutput, SUdfInit* buf)`
B
Bo Ding 已提交
63

W
wade zhang 已提交
64 65 66 67
其中 udfFinalizeFunc 是函数名的占位符 ,其中各参数的具体含义是:
  - dataOutput:输出数据的缓冲区。
  - interBuf:中间结算结果缓冲区,可作为输入。
  - numOfOutput:输出数据的个数,对聚合函数来说只能是 0 或者 1。
B
Bo Ding 已提交
68 69
  - buf:用于在 UDF 与引擎间的状态控制信息传递块。

W
wade zhang 已提交
70
## UDF 实现方式的规则总结
B
Bo Ding 已提交
71

W
wade zhang 已提交
72
三类 UDF 函数: udfNormalFunc、udfMergeFunc、udfFinalizeFunc ,其函数名约定使用相同的前缀,此前缀即 udfNormalFunc 的实际函数名,也即 udfNormalFunc 函数不需要在实际函数名后添加后缀;而udfMergeFunc 的函数名要加上后缀 `_merge`、udfFinalizeFunc 的函数名要加上后缀 `_finalize`,这是 UDF 实现规则的一部分,系统会按照这些函数名后缀来调用相应功能。
B
Bo Ding 已提交
73 74 75 76 77 78

根据 UDF 函数类型的不同,用户所要实现的功能函数也不同:

- 标量函数:UDF 中需实现 udfNormalFunc。
- 聚合函数:UDF 中需实现 udfNormalFunc、udfMergeFunc(对超级表查询)、udfFinalizeFunc。

W
wade zhang 已提交
79 80 81 82
:::note
如果对应的函数不需要具体的功能,也需要实现一个空函数。

:::
B
Bo Ding 已提交
83 84 85

## 编译 UDF

G
gccgdb1234 已提交
86
用户定义函数的 C 语言源代码无法直接被 TDengine 系统使用,而是需要先编译为 动态链接库,之后才能载入 TDengine 系统。
B
Bo Ding 已提交
87

G
gccgdb1234 已提交
88
例如,按照上一章节描述的规则准备好了用户定义函数的源代码 add_one.c,以 Linux 为例可以执行如下指令编译得到动态链接库文件:
B
Bo Ding 已提交
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103

```bash
gcc -g -O0 -fPIC -shared add_one.c -o add_one.so
```

这样就准备好了动态链接库 add_one.so 文件,可以供后文创建 UDF 时使用了。为了保证可靠的系统运行,编译器 GCC 推荐使用 7.5 及以上版本。

## 在系统中管理和使用 UDF

### 创建 UDF

用户可以通过 SQL 指令在系统中加载客户端所在主机上的 UDF 函数库(不能通过 RESTful 接口或 HTTP 管理界面来进行这一过程)。一旦创建成功,则当前 TDengine 集群的所有用户都可以在 SQL 指令中使用这些函数。UDF 存储在系统的 MNode 节点上,因此即使重启 TDengine 系统,已经创建的 UDF 也仍然可用。

在创建 UDF 时,需要区分标量函数和聚合函数。如果创建时声明了错误的函数类别,则可能导致通过 SQL 指令调用函数时出错。此外, UDF 支持输入与输出类型不一致,用户需要保证输入数据类型与 UDF 程序匹配,UDF 输出数据类型与 OUTPUTTYPE 匹配。

W
wade zhang 已提交
104 105 106 107
- 创建标量函数
```sql
CREATE FUNCTION ids(X) AS ids(Y) OUTPUTTYPE typename(Z) [ BUFSIZE B ];
```
B
Bo Ding 已提交
108 109 110 111 112 113 114 115 116 117 118 119

  - ids(X):标量函数未来在 SQL 指令中被调用时的函数名,必须与函数实现中 udfNormalFunc 的实际名称一致;
  - ids(Y):包含 UDF 函数实现的动态链接库的库文件绝对路径(指的是库文件在当前客户端所在主机上的保存路径,通常是指向一个 .so 文件),这个路径需要用英文单引号或英文双引号括起来;
  - typename(Z):此函数计算结果的数据类型,与上文中 udfNormalFunc 的 itype 参数不同,这里不是使用数字表示法,而是直接写类型名称即可;
  - B:中间计算结果的缓冲区大小,单位是字节,最小 0,最大 512,如果不使用可以不设置。

  例如,如下语句可以把 add_one.so 创建为系统中可用的 UDF:

  ```sql
  CREATE FUNCTION add_one AS "/home/taos/udf_example/add_one.so" OUTPUTTYPE INT;
  ```

W
wade zhang 已提交
120 121 122 123
- 创建聚合函数:
```sql
CREATE AGGREGATE FUNCTION ids(X) AS ids(Y) OUTPUTTYPE typename(Z) [ BUFSIZE B ];
```
B
Bo Ding 已提交
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139

  - ids(X):聚合函数未来在 SQL 指令中被调用时的函数名,必须与函数实现中 udfNormalFunc 的实际名称一致;
  - ids(Y):包含 UDF 函数实现的动态链接库的库文件绝对路径(指的是库文件在当前客户端所在主机上的保存路径,通常是指向一个 .so 文件),这个路径需要用英文单引号或英文双引号括起来;
  - typename(Z):此函数计算结果的数据类型,与上文中 udfNormalFunc 的 itype 参数不同,这里不是使用数字表示法,而是直接写类型名称即可;
  - B:中间计算结果的缓冲区大小,单位是字节,最小 0,最大 512,如果不使用可以不设置。

  关于中间计算结果的使用,可以参考示例程序[demo.c](https://github.com/taosdata/TDengine/blob/develop/tests/script/sh/demo.c)

  例如,如下语句可以把 demo.so 创建为系统中可用的 UDF:

  ```sql
  CREATE AGGREGATE FUNCTION demo AS "/home/taos/udf_example/demo.so" OUTPUTTYPE DOUBLE bufsize 14;
  ```

### 管理 UDF

W
wade zhang 已提交
140 141 142 143 144 145
- 删除指定名称的用户定义函数:
```
DROP FUNCTION ids(X);
```

- ids(X):此参数的含义与 CREATE 指令中的 ids(X) 参数一致,也即要删除的函数的名字,例如 
G
gccgdb1234 已提交
146 147 148
```sql
DROP FUNCTION add_one;
```
W
wade zhang 已提交
149 150 151 152
- 显示系统中当前可用的所有 UDF:
```sql
SHOW FUNCTIONS;
```
B
Bo Ding 已提交
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173

### 调用 UDF

在 SQL 指令中,可以直接以在系统中创建 UDF 时赋予的函数名来调用用户定义函数。例如:
```sql
SELECT X(c) FROM table/stable;
```

表示对名为 c 的数据列调用名为 X 的用户定义函数。SQL 指令中用户定义函数可以配合 WHERE 等查询特性来使用。

## UDF 的一些使用限制

在当前版本下,使用 UDF 存在如下这些限制:

1. 在创建和调用 UDF 时,服务端和客户端都只支持 Linux 操作系统;
2. UDF 不能与系统内建的 SQL 函数混合使用,暂不支持在一条 SQL 语句中使用多个不同名的 UDF ;
3. UDF 只支持以单个数据列作为输入;
4. UDF 只要创建成功,就会被持久化存储到 MNode 节点中;
5. 无法通过 RESTful 接口来创建 UDF;
6. UDF 在 SQL 中定义的函数名,必须与 .so 库文件实现中的接口函数名前缀保持一致,也即必须是 udfNormalFunc 的名称,而且不可与 TDengine 中已有的内建 SQL 函数重名。

W
wade zhang 已提交
174
## 示例代码
B
Bo Ding 已提交
175

G
gccgdb1234 已提交
176 177 178 179
### 标量函数示例 [add_one](https://github.com/taosdata/TDengine/blob/develop/tests/script/sh/add_one.c)

<details>
<summary>add_one.c</summary>
B
Bo Ding 已提交
180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214

```c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

typedef struct SUdfInit{
 int maybe_null;       /* 1 if function can return NULL */
 int decimals;     /* for real functions */
 long long length;       /* For string functions */
 char  *ptr;            /* free pointer for function data */
 int const_item;       /* 0 if result is independent of arguments */
} SUdfInit;

void add_one(char* data, short itype, short ibytes, int numOfRows, long long* ts, char* dataOutput, char* interBUf, char* tsOutput,
                        int* numOfOutput, short otype, short obytes, SUdfInit* buf) {
   int i;
   int r = 0;
  //  printf("add_one input data:%p, type:%d, rows:%d, ts:%p,%lld, dataoutput:%p, tsOutput:%p, numOfOutput:%p, buf:%p\n", data, itype, numOfRows, ts, *ts, dataOutput, tsOutput, numOfOutput, buf);
   if (itype == 4) {
     for(i=0;i<numOfRows;++i) {
      //  printf("input %d - %d", i, *((int *)data + i));
       *((int *)dataOutput+i)=*((int *)data + i) + 1;
      //  printf(", output %d\n", *((int *)dataOutput+i));
       if (tsOutput) {
         *(long long*)tsOutput=1000000;
       }
     }
     *numOfOutput=numOfRows;

    //  printf("add_one out, numOfOutput:%d\n", *numOfOutput);
   }
}
```

G
gccgdb1234 已提交
215 216 217 218 219 220
</details>

### 向量函数示例 [abs_max](https://github.com/taosdata/TDengine/blob/develop/tests/script/sh/abs_max.c)

<details>
<summary>abs_max.c</summary>
B
Bo Ding 已提交
221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319

```c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <inttypes.h>

typedef struct SUdfInit{
 int maybe_null;       /* 1 if function can return NULL */
 int decimals;     /* for real functions */
 int64_t length;       /* For string functions */
 char  *ptr;            /* free pointer for function data */
 int const_item;       /* 0 if result is independent of arguments */
} SUdfInit;


#define TSDB_DATA_INT_NULL              0x80000000L
#define TSDB_DATA_BIGINT_NULL           0x8000000000000000L

void abs_max(char* data, short itype, short ibytes, int numOfRows, int64_t* ts, char* dataOutput, char* interBuf, char* tsOutput,
                        int* numOfOutput, short otype, short obytes, SUdfInit* buf) {
   int i;
   int64_t r = 0;
   // printf("abs_max input data:%p, type:%d, rows:%d, ts:%p, %" PRId64 ", dataoutput:%p, tsOutput:%p, numOfOutput:%p, buf:%p\n", data, itype, numOfRows, ts, *ts, dataOutput, tsOutput, numOfOutput, buf);
   if (itype == 5) {
     r=*(int64_t *)dataOutput;
     *numOfOutput=0;

     for(i=0;i<numOfRows;++i) {
       if (*((int64_t *)data + i) == TSDB_DATA_BIGINT_NULL) {
         continue;
       }

       *numOfOutput=1;
       //int64_t v = abs(*((int64_t *)data + i));
       int64_t v = *((int64_t *)data + i);
       if (v < 0) {
          v = 0 - v;
       }

       if (v > r) {
          r = v;
       }
     }

     *(int64_t *)dataOutput=r;

   //   printf("abs_max out, dataoutput:%" PRId64", numOfOutput:%d\n", *(int64_t *)dataOutput, *numOfOutput);
   }else {
     *numOfOutput=0;
   }
}



void abs_max_finalize(char* dataOutput, char* interBuf, int* numOfOutput, SUdfInit* buf) {
   int i;
   //int64_t r = 0;
   // printf("abs_max_finalize dataoutput:%p:%d, numOfOutput:%d, buf:%p\n", dataOutput, *dataOutput, *numOfOutput, buf);
   // *numOfOutput=1;
   // printf("abs_max finalize, dataoutput:%" PRId64", numOfOutput:%d\n", *(int64_t *)dataOutput, *numOfOutput);
}

void abs_max_merge(char* data, int32_t numOfRows, char* dataOutput, int32_t* numOfOutput, SUdfInit* buf) {
   int64_t r = 0;

   if (numOfRows > 0) {
      r = *((int64_t *)data);
   }
   // printf("abs_max_merge numOfRows:%d, dataoutput:%p, buf:%p\n", numOfRows, dataOutput, buf);
   for (int i = 1; i < numOfRows; ++i) {
   //   printf("abs_max_merge %d - %" PRId64"\n", i, *((int64_t *)data + i));
     if (*((int64_t*)data + i) > r) {
        r= *((int64_t*)data + i);
     }
   }

   *(int64_t*)dataOutput=r;
   if (numOfRows > 0) {
     *numOfOutput=1;
   } else {
     *numOfOutput=0;
   }

   // printf("abs_max_merge, dataoutput:%" PRId64", numOfOutput:%d\n", *(int64_t *)dataOutput, *numOfOutput);
}


int abs_max_init(SUdfInit* buf) {
   // printf("abs_max init\n");
   return 0;
}


void abs_max_destroy(SUdfInit* buf) {
   // printf("abs_max destroy\n");
}
```

G
gccgdb1234 已提交
320 321 322 323 324 325
</details>

### 使用中间计算结果示例 [demo](https://github.com/taosdata/TDengine/blob/develop/tests/script/sh/demo.c)

<details>
<summary>demo.c</summary>
B
Bo Ding 已提交
326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439

```c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

typedef struct SUdfInit{
 int maybe_null;       /* 1 if function can return NULL */
 int decimals;     /* for real functions */
 long long length;       /* For string functions */
 char  *ptr;            /* free pointer for function data */
 int const_item;       /* 0 if result is independent of arguments */
} SUdfInit;

typedef struct SDemo{
  double sum;
  int num;
  short otype;
}SDemo;

#define FLOAT_NULL            0x7FF00000              // it is an NAN
#define DOUBLE_NULL           0x7FFFFF0000000000L     // it is an NAN


void demo(char* data, short itype, short ibytes, int numOfRows, long long* ts, char* dataOutput, char* interBuf, char* tsOutput,
                        int* numOfOutput, short otype, short obytes, SUdfInit* buf) {
   int i;
   double r = 0;
   SDemo *p = (SDemo *)interBuf;
   SDemo *q = (SDemo *)dataOutput;
   printf("demo input data:%p, type:%d, rows:%d, ts:%p,%lld, dataoutput:%p, interBUf:%p, tsOutput:%p, numOfOutput:%p, buf:%p\n", data, itype, numOfRows, ts, *ts, dataOutput, interBuf, tsOutput, numOfOutput, buf);

   for(i=0;i<numOfRows;++i) {
     if (itype == 4) {
       r=*((int *)data+i);
     } else if (itype == 6) {
       r=*((float *)data+i);
     } else if (itype == 7) {
       r=*((double *)data+i);
     }

     p->sum += r*r;
   }

   p->otype = otype;
   p->num += numOfRows;

   q->sum = p->sum;
   q->num = p->num;
   q->otype = p->otype;

   *numOfOutput=1;

   printf("demo out, sum:%f, num:%d, numOfOutput:%d\n", p->sum, p->num, *numOfOutput);
}


void demo_merge(char* data, int32_t numOfRows, char* dataOutput, int32_t* numOfOutput, SUdfInit* buf) {
   int i;
   SDemo *p = (SDemo *)data;
   SDemo res = {0};
   printf("demo_merge input data:%p, rows:%d, dataoutput:%p, numOfOutput:%p, buf:%p\n", data, numOfRows, dataOutput, numOfOutput, buf);

   for(i=0;i<numOfRows;++i) {
     res.sum += p->sum * p->sum;
     res.num += p->num;
     p++;
   }

   p->sum = res.sum;
   p->num = res.num;

   *numOfOutput=1;

   printf("demo out, sum:%f, num:%d, numOfOutput:%d\n", p->sum, p->num, *numOfOutput);
}



void demo_finalize(char* dataOutput, char* interBuf, int* numOfOutput, SUdfInit* buf) {
   SDemo *p = (SDemo *)interBuf;
   printf("demo_finalize interbuf:%p, numOfOutput:%p, buf:%p, sum:%f, num:%d\n", interBuf, numOfOutput, buf, p->sum, p->num);
   if (p->otype == 6) {
     if (p->num != 30000) {
       *(unsigned int *)dataOutput = FLOAT_NULL;
     } else {
       *(float *)dataOutput = (float)(p->sum / p->num);
     }
     printf("finalize values:%f\n", *(float *)dataOutput);
   } else if (p->otype == 7) {
     if (p->num != 30000) {
       *(unsigned long long *)dataOutput = DOUBLE_NULL;
     } else {
       *(double *)dataOutput = (double)(p->sum / p->num);
     }
     printf("finalize values:%f\n", *(double *)dataOutput);
   }

   *numOfOutput=1;

   printf("demo finalize, numOfOutput:%d\n", *numOfOutput);
}


int demo_init(SUdfInit* buf) {
   printf("demo init\n");
   return 0;
}


void demo_destroy(SUdfInit* buf) {
   printf("demo destroy\n");
}
```
G
gccgdb1234 已提交
440 441

</details>