## Implement a UDF in Python
### Prepare Environment
1. Prepare Python Environment
Please follow standard procedure of python environment preparation.
2. Install Python package `taospyudf`
pip3 install taospyudf
During this process, some C++ code needs to be compiled. So it's required to have `cmake` and `gcc` on your system. The compiled `libtaospyudf.so` will be automatically copied to `/usr/local/lib` path. If you are not root user, please use `sudo`. After installation is done, please check using the command below.
root@slave11 ~/udf $ ls -l /usr/local/lib/libtaos*
-rw-r--r-- 1 root root 671344 May 24 22:54 /usr/local/lib/libtaospyudf.so
Then execute the command below.
3. If you want to utilize some 3rd party python packages using `PYTHONPATH`, please set configuration parameter `UdfdLdLibPath` to the value of `PYTHONPATH` before starting `taosd`.
4. Launch `taosd` service
Please refer to [Get Started](../get-started)
### Interface definition
#### Introduction to Interface
Implement the specified interface functions when implementing a UDF in Python.
- implement `process` function for the scalar UDF.
- implement `start`, `reduce`, `finish` for the aggregate UDF.
- implement `init` for initialization and `destroy` for termination.
### Implement a Scalar UDF in Python
#### Scalar UDF Interface
The implementation of a scalar UDF is described as follows:
def process(input: datablock) -> tuple[output_type]:
Description: this function prcesses datablock, which is the input; you can use datablock.data(row, col) to access the python object at location(row,col); the output is a tuple object consisted of objects of type outputtype
### Aggregate UDF Interface
The implementation of an aggregate function is described as follows:
def start() -> bytes:
def reduce(inputs: datablock, buf: bytes) -> bytes
def finish(buf: bytes) -> output_type:
Description: first the start() is invoked to generate the initial result `buffer`; then the input data is divided into multiple row blocks, and reduce() is invoked for each block `inputs` and current intermediate result `buf`; finally finish() is invoked to generate the final result from intermediate `buf`, the final result can only contains 0 or 1 data.
#### Initialization and Cleanup Interface
def init()
def destroy()
Description: init() does the work of initialization before processing any data; destroy() does the work of cleanup after the data is processed.
### Python UDF Template
#### Scalar Template
def init():
# initialization
def destroy():
# destroy
def process(input: datablock) -> tuple[output_type]:
# process input datablock,
# datablock.data(row, col) is to access the python object in location(row,col)
# return tuple object consisted of object of type outputtype
def process(input: datablock) -> tuple[output_type]:
### Implement an Aggregate UDF in Python
Note:process() must be implemeted, init() and destroy() must be defined too but they can do nothing.
The implementation of an aggregate function is described as follows:
#### Aggregate Template
def init():
......@@ -311,33 +374,7 @@ def finish(buf: bytes) -> output_type:
#return obj of type outputtype
### Python UDF Interface Definition
#### Scalar interface
def process(input: datablock) -> tuple[output_type]:
- `input` is a data block two-dimension matrix-like object, of which method `data(row, col)` returns the Python object located at location (`row`, `col`)
- return a Python tuple object, of which each item is a Python object of type `output_type`
#### Aggregate Interface
def start() -> bytes:
def reduce(input: datablock, buf: bytes) -> bytes
def finish(buf: bytes) -> output_type:
- first `start()` is called to return the initial result in type `bytes`
- then the input data are divided into multiple data blocks and for each block `input`, `reduce` is called with the data block `input` and the current result `buf` bytes and generates a new intermediate result buffer.
- finally, the `finish` function is called on the intermediate result `buf` and outputs 0 or 1 data of type `output_type`
#### Initialization and Cleanup Interface
def init()
def destroy()
Implement `init` for initialization and `destroy` for termination.
Note: aggregate UDF requires init(), destroy(), start(), reduce() and finish() to be impemented. start() generates the initial result in buffer, then the input data is divided into multiple row data blocks, reduce() is invoked for each data block `inputs` and intermediate `buf`, finally finish() is invoked to generate final result from the intermediate result `buf`.
### Data Mapping between TDengine SQL and Python UDF
......@@ -353,15 +390,460 @@ The following table describes the mapping between TDengine SQL data type and Pyt
|TIMESTAMP | int |
|JSON and other types | Not Supported |
### Installing Python UDF
1. Install Python package `taospyudf` that executes Python UDF
sudo pip install taospyudf
### Development Guide
In this section we will demonstrate 5 examples of developing UDF in Python language. In this guide, you will learn the development skills from easy case to hard case, the examples include:
1. A scalar function which accepts only one integer as input and outputs ln(n^2 + 1)。
2. A scalar function which accepts n integers, like(x1, x2, ..., xn)and output the sum of the product of each input and its sequence number, i.e. x1 + 2 * x2 + ... + n * xn。
3. A scalar function which accepts a timestamp and output the next closest Sunday of the timestamp. In this case, we will demonstrate how to use 3rd party library `moment`.
4. An aggregate function which calculates the difference between the maximum and the minimum of a specific column, i.e. same functionality of built-in spread().
In the guide, some debugging skills of using Python UDF will be explained too.
We assume you are using Linux system and already have TDengine and Python 3.x.
Note:**You can't use print() function to output log inside a UDF, you have to write the log to a specific file or use logging module of Python.**
#### Sample 1: Simplest UDF
This scalar UDF accepts an integer as input and output ln(n^2 + 1).
Firstly, please compose a Python source code file in your system and save it, e.g. `/root/udf/myfun.py`, the code is like below.
from math import log
def init():
def destroy():
def process(block):
rows, _ = block.shape()
return [log(block.data(i, 0) ** 2 + 1) for i in range(rows)]
This program consists of 3 functions, init() and destroy() do nothing, but they have to be defined even though there is nothing to do in them because they are critical parts of a python UDF. The most important function is process(), which accepts a data block and the data block object has two methods:
1. shape() returns the number of rows and the number of columns of the data block
2. data(i, j) returns the value at (i,j) in the block
The output of the process() function of a scalar UDF returns exactly same number of data as the number of input rows. We will ignore the number of columns because we just want to compute on the first column.
Then, we create the UDF using the SQL command below.
create function myfun as '/root/udf/myfun.py' outputtype double language 'Python'
Here is the output example, it may change a little depending on your version being used.
taos> create function myfun as '/root/udf/myfun.py' outputtype double language 'Python';
Create OK, 0 row(s) affected (0.005202s)
Then, we used the `show` command to prove the creation of the UDF is successful.
taos> show functions;
name |
myfun |
Query OK, 1 row(s) in set (0.005767s)
Next, we can try to test the function. Before executing the UDF, we need to prepare some data using the command below in TDengine CLI.
create database test;
create table t(ts timestamp, v1 int, v2 int, v3 int);
insert into t values('2023-05-01 12:13:14', 1, 2, 3);
insert into t values('2023-05-03 08:09:10', 2, 3, 4);
insert into t values('2023-05-10 07:06:05', 3, 4, 5);
Execute the UDF to test it:
taos> select myfun(v1, v2) from t;
DB error: udf function execution failure (0.011088s)
Unfortunately, the UDF execution failed. We need to check the log `udfd` daemon to find out why.
tail -10 /var/log/taos/udfd.log
Below is the output.
05/24 22:46:28.733545 01665799 UDF ERROR can not load library libtaospyudf.so. error: operation not permitted
05/24 22:46:28.733561 01665799 UDF ERROR can not load python plugin. lib path libtaospyudf.so
From the error message we can find out that `libtaospyudf.so` was not loaded successfully. Please refer to the [Prepare Environment] section.
After correcting environment issues, execute the UDF:
taos> select myfun(v1) from t;
myfun(v1) |
0.693147181 |
1.609437912 |
2.302585093 |
Now, we have finished the first PDF in Python, and learned some basic debugging skills.
#### Sample 2: Abnormal Processing
The `myfun` UDF example in sample 1 has passed, but it has two drawbacks.
1. It accepts only one column of data as input, and doesn't throw exception if you passes multiple columns.
taos> select myfun(v1, v2) from t;
myfun(v1, v2) |
0.693147181 |
1.609437912 |
2.302585093 |
2. `null` value is not processed. We except the program to throw exception and terminate if `null` is passed as input.
So, we try to optimize the process() function as below.
def process(block):
rows, cols = block.shape()
if cols > 1:
raise Exception(f"require 1 parameter but given {cols}")
return [ None if block.data(i, 0) is None else log(block.data(i, 0) ** 2 + 1) for i in range(rows)]
The update the UDF with command below.
create or replace function myfun as '/root/udf/myfun.py' outputtype double language 'Python';
At this time, if we pass two arguments to `myfun`, the execution would fail.
taos> select myfun(v1, v2) from t;
DB error: udf function execution failure (0.014643s)
However, the exception is not shown to end user, but displayed in the log file `/var/log/taos/taospyudf.log`
2023-05-24 23:21:06.790 ERROR [1666188] [doPyUdfScalarProc@507] call pyUdfScalar proc function. context 0x7faade26d180. error: Exception: require 1 parameter but given 2
/var/lib/taos//.udf/myfun_3_1884e1281d9.py(12): process
Now, we have learned how to update a UDF and check the log of a UDF.
Note: Prior to TDengine (excluding), updating a UDF requires to restart `taosd` service. After, restarting is not required.
#### Sample 3: UDF with n arguments
A UDF which accepts n intergers, likee (x1, x2, ..., xn) and output the sum of the product of each value and its sequence number: 1 * x1 + 2 * x2 + ... + n * xn. If there is `null` in the input, then the result is `null`. The difference from sample 1 is that it can accept any number of columns as input and process each column. Assume the program is written in /root/udf/nsum.py:
def init():
def destroy():
def process(block):
rows, cols = block.shape()
result = []
for i in range(rows):
total = 0
for j in range(cols):
v = block.data(i, j)
if v is None:
total = None
total += (j + 1) * block.data(i, j)
return result
Crate and test the UDF:
create function nsum as '/root/udf/nsum.py' outputtype double language 'Python';
taos> insert into t values('2023-05-25 09:09:15', 6, null, 8);
Insert OK, 1 row(s) affected (0.003675s)
taos> select ts, v1, v2, v3, nsum(v1, v2, v3) from t;
ts | v1 | v2 | v3 | nsum(v1, v2, v3) |
2023-05-01 12:13:14.000 | 1 | 2 | 3 | 14.000000000 |
2023-05-03 08:09:10.000 | 2 | 3 | 4 | 20.000000000 |
2023-05-10 07:06:05.000 | 3 | 4 | 5 | 26.000000000 |
2023-05-25 09:09:15.000 | 6 | NULL | 8 | NULL |
Query OK, 4 row(s) in set (0.010653s)
Sample 4: Utilize 3rd party package
A UDF which accepts a timestamp and output the next closed Sunday. This sample requires to use third party package `moment`, you need to install it firslty.
pip3 install moment
Then compose the Python code in /root/udf/nextsunday.py
import moment
def init():
def destroy():
def process(block):
rows, cols = block.shape()
if cols > 1:
raise Exception("require only 1 parameter")
if not type(block.data(0, 0)) is int:
raise Exception("type error")
return [moment.unix(block.data(i, 0)).replace(weekday=7).format('YYYY-MM-DD')
for i in range(rows)]
UDF framework will map the TDengine timestamp to Python int type, so this function only accepts an integer representing millisecond. process() firstly validates the parameters, then use `moment` to replace the time, format the result and output.
Create and test the UDF.
create function nextsunday as '/root/udf/nextsunday.py' outputtype binary(10) language 'Python';
If your `taosd` is started using `systemd`, you man encounter the error below. Next we will show how to debug.
taos> select ts, nextsunday(ts) from t;
DB error: udf function execution failure (1.123615s)
tail -20 taospyudf.log
2023-05-25 11:42:34.541 ERROR [1679419] [PyUdf::PyUdf@217] py udf load module failure. error ModuleNotFoundError: No module named 'moment'
This is because `moment` doesn't exist in the default library search path of python UDF, please check the log file `taosdpyudf.log`.
grep 'sys path' taospyudf.log | tail -1
2023-05-25 10:58:48.554 INFO [1679419] [doPyOpen@592] python sys path: ['', '/lib/python38.zip', '/lib/python3.8', '/lib/python3.8/lib-dynload', '/lib/python3/dist-packages', '/var/lib/taos//.udf']
You may find that the default library search path is `/lib/python3/dist-packages` (just for example, it may be different in your system), but `moment` is installed to `/usr/local/lib/python3.8/dist-packages` (for example, it may be different in your system). Then we change the library search path of python UDF.
Check `sys.path`
>>> import sys
>>> ":".join(sys.path)
Copy the output and edit /var/taos/taos.cfg to add below configuration parameter.
UdfdLdLibPath /usr/lib/python3.8:/usr/lib/python3.8/lib-dynload:/usr/local/lib/python3.8/dist-packages:/usr/lib/python3/dist-packages
Save it, then restart `taosd`, using `systemctl restart taosd`, and test again, it will succeed this time.
taos> select ts, nextsunday(ts) from t;
ts | nextsunday(ts) |
2023-05-01 12:13:14.000 | 2023-05-07 |
2023-05-03 08:09:10.000 | 2023-05-07 |
2023-05-10 07:06:05.000 | 2023-05-14 |
2023-05-25 09:09:15.000 | 2023-05-28 |
Query OK, 4 row(s) in set (1.011474s)
#### Sample 5: Aggregate Function
An aggregate function which calculates the difference of the maximum and the minimum in a column. An aggregate funnction takes multiple rows as input and output only one data. The execution process of an aggregate UDF is like map-reduce, the framework divides the input into multiple parts, each mapper processes one block and the reducer aggregates the result of the mappers. The reduce() of Python UDF has the functionality of both map() and reduce(). The reduce() takes two arguments: the data to be processed; and the result of other tasks executing reduce(). For exmaple, assume the code is in `/root/udf/myspread.py`.
import io
import math
import pickle
LOG_FILE: io.TextIOBase = None
def init():
global LOG_FILE
LOG_FILE = open("/var/log/taos/spread.log", "wt")
log("init function myspead success")
def log(o):
LOG_FILE.write(str(o) + '\n')
def destroy():
log("close log file: spread.log")
def start():
return pickle.dumps((-math.inf, math.inf))
def reduce(block, buf):
max_number, min_number = pickle.loads(buf)
log(f"initial max_number={max_number}, min_number={min_number}")
rows, _ = block.shape()
for i in range(rows):
v = block.data(i, 0)
if v > max_number:
max_number = v
if v < min_number:
min_number = v
return pickle.dumps((max_number, min_number))
def finish(buf):
max_number, min_number = pickle.loads(buf)
return max_number - min_number
In this example, we implemented an aggregate function, and added some logging.
1. init() opens a file for logging
2. log() is the function for logging, it converts the input object to string and output with an end of line
3. destroy() closes the log file \
4. start() returns the initial buffer for storing the intermediate result
5. reduce() processes each daa block and aggregates the result
6. finish() converts the final buffer() to final result\
Create the UDF.
create or replace aggregate function myspread as '/root/udf/myspread.py' outputtype double bufsize 128 language 'Python';
This SQL command has two important different points from the command creating scalar UDF.
1. keyword `aggregate` is used
2. keyword `bufsize` is used to specify the memory size for storing the intermediate result. In this example, the result is 32 bytes, but we specified 128 bytes for `bufsize`. You can use the `python` CLI to print actual size.
>>> len(pickle.dumps((12345.6789, 23456789.9877)))
Test this function, you can see the result is same as built-in spread() function. \
taos> select myspread(v1) from t;
myspread(v1) |
5.000000000 |
Query OK, 1 row(s) in set (0.013486s)
taos> select spread(v1) from t;
spread(v1) |
5.000000000 |
Query OK, 1 row(s) in set (0.005501s)
At last, check the log file, we can see that the reduce() function is executed 3 times, max value is updated 3 times and min value is updated only one time.
root@slave11 /var/log/taos $ cat spread.log
init function myspead success
initial max_number=-inf, min_number=inf
initial max_number=1, min_number=1
initial max_number=3, min_number=1
close log file: spread.log
### SQL Commands
1. Create Scalar UDF
CREATE FUNCTION function_name AS library_path OUTPUTTYPE output_type LANGUAGE 'Python';
2. Create Aggregate UDF
CREATE AGGREGATE FUNCTION function_name library_path OUTPUTTYPE output_type LANGUAGE 'Python';
3. Update Scalar UDF
4. Update Aggregate UDF
Note: If keyword `AGGREGATE` used, the UDF will be treated as aggregate UDF despite what it was before; Similarly, if there is no keyword `aggregate`, the UDF will be treated as scalar function despite what it was before. 更
5. Show the UDF
The version of a UDF is increased by one every time it's updated.
select * from ins_functions \G;
6. Show and Drop existing UDF
SHOW functions;
DROP FUNCTION function_name;
### Python UDF Sample Code
### More Python UDF Samples
#### Scalar Function [pybitand](https://github.com/taosdata/TDengine/blob/3.0/tests/script/sh/pybitand.py)
The `pybitand` function implements bitwise addition for multiple columns. If there is only one column, the column is returned. The `pybitand` function ignores null values.
......@@ -271,26 +271,88 @@ select max_vol(vol1,vol2,vol3,deviceid) from battery;
## 用 Python 语言实现 UDF
### 准备环境
1. 准备好 Python 运行环境
2. 安装 Python 包 `taospyudf`
pip3 install taospyudf
安装过程中会编译 C++ 源码,因此系统上要有 cmake 和 gcc。编译生成的 libtaospyudf.so 文件自动会被复制到 /usr/local/lib/ 目录,因此如果是非 root 用户,安装时需加 sudo。安装完可以检查这个目录是否有了这个文件:
root@slave11 ~/udf $ ls -l /usr/local/lib/libtaos*
-rw-r--r-- 1 root root 671344 May 24 22:54 /usr/local/lib/libtaospyudf.so
3. 如果 Python UDF 程序执行时,通过 PYTHONPATH 引用其它的包,可以设置 taos.cfg 的 UdfdLdLibPath 变量为PYTHONPATH的内容
4. 启动 `taosd` 服务
细节请参考 [快速开始](../get-started)
### 接口定义
#### 接口概述
使用 Python 语言实现 UDF 时,需要实现规定的接口函数
- 标量函数需要实现标量接口函数 process 。
- 聚合函数需要实现聚合接口函数 start ,reduce ,finish。
- 如果需要初始化,实现 init;如果需要清理工作,实现 destroy。
### 用 Python 实现标量函数
#### 标量函数接口
def process(input: datablock) -> tuple[output_type]:
- input:datablock 类似二维矩阵,通过成员方法 data(row,col)返回位于 row 行,col 列的 python 对象
- 返回值是一个 Python 对象元组,每个元素类型为输出类型。
#### 聚合函数接口
def start() -> bytes:
def reduce(inputs: datablock, buf: bytes) -> bytes
def finish(buf: bytes) -> output_type:
- 首先调用 start 生成最初结果 buffer
- 然后输入数据会被分为多个行数据块,对每个数据块 inputs 和当前中间结果 buf 调用 reduce,得到新的中间结果
- 最后再调用 finish 从中间结果 buf 产生最终输出,最终输出只能含 0 或 1 条数据。
#### 初始化和销毁接口
def init()
def destroy()
- init 完成初始化工作
- destroy 完成清理工作
### 标量函数实现模板
def init():
# initialization
def destroy():
# destroy
def process(input: datablock) -> tuple[output_type]:
# process input datablock,
# datablock.data(row, col) is to access the python object in location(row,col)
# return tuple object consisted of object of type outputtype
def process(input: datablock) -> tuple[output_type]:
### 用 Python 实现聚合函数
注意:定义标题函数最重要是要实现 process 函数,同时必须定义 init 和 destroy 函数即使什么都不做
### 聚合函数实现模板
......@@ -310,34 +372,9 @@ def finish(buf: bytes) -> output_type:
#return obj of type outputtype
### Python UDF 接口函数定义
#### 标量函数接口
def process(input: datablock) -> tuple[output_type]:
- input:datablock 类似二维矩阵,通过成员方法 data(row,col)返回位于 row 行,col 列的 python 对象
- 返回值是一个 Python 对象元组,每个元素类型为输出类型。
#### 聚合函数接口
def start() -> bytes:
def reduce(inputs: datablock, buf: bytes) -> bytes
def finish(buf: bytes) -> output_type:
首先调用 start 生成最初结果 buffer,然后输入数据会被分为多个行数据块,对每个数据块 inputs 和当前中间结果 buf 调用 reduce,得到新的中间结果,最后再调用 finish 从中间结果 buf 产生最终输出,最终输出只能含 0 或 1 条数据。
#### 初始化和销毁接口
def init()
def destroy()
其中 init 完成初始化工作。 destroy 完成清理工作。
注意:定义聚合函数最重要是要实现 start, reduce 和 finish,且必须定义 init 和 destroy 函数。start 生成最初结果 buffer,然后输入数据会被分为多个行数据块,对每个数据块 inputs 和当前中间结果 buf 调用 reduce,得到新的中间结果,最后再调用 finish 从中间结果 buf 产生最终输出。
### Python 和 TDengine之间的数据类型映射
### 数据类型映射
下表描述了TDengine SQL数据类型和Python数据类型的映射。任何类型的NULL值都映射成Python的None值。
......@@ -351,15 +388,461 @@ def destroy()
|TIMESTAMP | int |
|JSON and other types | 不支持 |
### Python UDF 环境的安装
1. 安装 taospyudf 包。此包执行Python UDF程序。
sudo pip install taospyudf
### 开发指南
本文内容由浅入深包括 4 个示例程序:
1. 定义一个只接收一个整数的标量函数: 输入 n, 输出 ln(n^2 + 1)。
2. 定义一个接收 n 个整数的标量函数, 输入 (x1, x2, ..., xn), 输出每个值和它们的序号的乘积的和: x1 + 2 * x2 + ... + n * xn。
3. 定义一个标量函数,输入一个时间戳,输出距离这个时间最近的下一个周日。完成这个函数要用到第三方库 moment。我们在这个示例中讲解使用第三方库的注意事项。
4. 定义一个聚合函数,计算某一列最大值和最小值的差, 也就是实现 TDengien 内置的 spread 函数。
同时也包含大量实用的 debug 技巧。
本文假设你用的是 Linux 系统,且已安装好了 TDengine 和 Python 3.x。
注意:**UDF 内无法通过 print 函数输出日志,需要自己写文件或用 python 内置的 logging 库写文件**
#### 最简单的 UDF
编写一个只接收一个整数的 UDF 函数: 输入 n, 输出 ln(n^2 + 1)。
首先编写一个 Python 文件,存在系统某个目录,比如 /root/udf/myfun.py 内容如下
from math import log
def init():
def destroy():
def process(block):
rows, _ = block.shape()
return [log(block.data(i, 0) ** 2 + 1) for i in range(rows)]
这个文件包含 3 个函数, init 和 destroy 都是空函数,它们是 UDF 的生命周期函数,即使什么都不做也要定义。最关键的是 process 函数, 它接受一个数据块,这个数据块对象有两个方法:
1. shape() 返回数据块的行数和列数
2. data(i, j) 返回 i 行 j 列的数据
标量函数的 process 方法传人的数据块有多少行,就需要返回多少个数据。上述代码中我们忽略的列数,因为我们只想对每行的第一个数做计算。
接下来我们创建对应的 UDF 函数,在 TDengine CLI 中执行下面语句:
create function myfun as '/root/udf/myfun.py' outputtype double language 'Python'
taos> create function myfun as '/root/udf/myfun.py' outputtype double language 'Python';
Create OK, 0 row(s) affected (0.005202s)
看起来很顺利,接下来 show 一下系统中所有的自定义函数,确认创建成功:
taos> show functions;
name |
myfun |
Query OK, 1 row(s) in set (0.005767s)
接下来就来测试一下这个函数,测试之前先执行下面的 SQL 命令,制造些测试数据,在 TDengine CLI 中执行下述命令
create database test;
create table t(ts timestamp, v1 int, v2 int, v3 int);
insert into t values('2023-05-01 12:13:14', 1, 2, 3);
insert into t values('2023-05-03 08:09:10', 2, 3, 4);
insert into t values('2023-05-10 07:06:05', 3, 4, 5);
测试 myfun 函数:
taos> select myfun(v1, v2) from t;
DB error: udf function execution failure (0.011088s)
查看 udfd 进程的日志
tail -10 /var/log/taos/udfd.log
05/24 22:46:28.733545 01665799 UDF ERROR can not load library libtaospyudf.so. error: operation not permitted
05/24 22:46:28.733561 01665799 UDF ERROR can not load python plugin. lib path libtaospyudf.so
错误很明确:没有加载到 Python 插件 libtaospyudf.so,如果遇到此错误,请参考前面的准备环境一节。
taos> select myfun(v1) from t;
myfun(v1) |
0.693147181 |
1.609437912 |
2.302585093 |
至此,我们完成了第一个 UDF 😊,并学会了简单的 debug 方法。
#### 示例二:异常处理
上面的 myfun 虽然测试测试通过了,但是有两个缺点:
1. 这个标量函数只接受 1 列数据作为输入,如果用户传入了多列也不会抛异常。
taos> select myfun(v1, v2) from t;
myfun(v1, v2) |
0.693147181 |
1.609437912 |
2.302585093 |
2. 没有处理 null 值。我们期望如果输入有 null,则会抛异常终止执行。
因此 process 函数改进如下:
def process(block):
rows, cols = block.shape()
if cols > 1:
raise Exception(f"require 1 parameter but given {cols}")
return [ None if block.data(i, 0) is None else log(block.data(i, 0) ** 2 + 1) for i in range(rows)]
然后执行下面的语句更新已有的 UDF:
create or replace function myfun as '/root/udf/myfun.py' outputtype double language 'Python';
再传入 myfun 两个参数,就会执行失败了
taos> select myfun(v1, v2) from t;
DB error: udf function execution failure (0.014643s)
但遗憾的是我们自定义的异常信息没有展示给用户,而是在插件的日志文件 /var/log/taos/taospyudf.log 中:
2023-05-24 23:21:06.790 ERROR [1666188] [doPyUdfScalarProc@507] call pyUdfScalar proc function. context 0x7faade26d180. error: Exception: require 1 parameter but given 2
/var/lib/taos//.udf/myfun_3_1884e1281d9.py(12): process
至此,我们学会了如何更新 UDF,并查看 UDF 输出的错误日志。
(注:如果 UDF 更新后未生效,在 TDengine 以前(不含)的版本中需要重启 taosd,在 及之后的版本中不需要重启 taosd 即可生效。)
#### 示例三: 接收 n 个参数的 UDF
编写一个 UDF:输入(x1, x2, ..., xn), 输出每个值和它们的序号的乘积的和: 1 * x1 + 2 * x2 + ... + n * xn。如果 x1 至 xn 中包含 null,则结果为 null。
这个示例与示例一的区别是,可以接受任意多列作为输入,且要处理每一列的值。编写 UDF 文件 /root/udf/nsum.py:
def init():
def destroy():
def process(block):
rows, cols = block.shape()
result = []
for i in range(rows):
total = 0
for j in range(cols):
v = block.data(i, j)
if v is None:
total = None
total += (j + 1) * block.data(i, j)
return result
创建 UDF:
create function nsum as '/root/udf/nsum.py' outputtype double language 'Python';
测试 UDF:
taos> insert into t values('2023-05-25 09:09:15', 6, null, 8);
Insert OK, 1 row(s) affected (0.003675s)
taos> select ts, v1, v2, v3, nsum(v1, v2, v3) from t;
ts | v1 | v2 | v3 | nsum(v1, v2, v3) |
2023-05-01 12:13:14.000 | 1 | 2 | 3 | 14.000000000 |
2023-05-03 08:09:10.000 | 2 | 3 | 4 | 20.000000000 |
2023-05-10 07:06:05.000 | 3 | 4 | 5 | 26.000000000 |
2023-05-25 09:09:15.000 | 6 | NULL | 8 | NULL |
Query OK, 4 row(s) in set (0.010653s)
编写一个 UDF,输入一个时间戳,输出距离这个时间最近的下一个周日。比如今天是 2023-05-25, 则下一个周日是 2023-05-28。
完成这个函数要用到第三方库 momen。先安装这个库:
pip3 install moment
然后编写 UDF 文件 /root/udf/nextsunday.py
import moment
def init():
def destroy():
def process(block):
rows, cols = block.shape()
if cols > 1:
raise Exception("require only 1 parameter")
if not type(block.data(0, 0)) is int:
raise Exception("type error")
return [moment.unix(block.data(i, 0)).replace(weekday=7).format('YYYY-MM-DD')
for i in range(rows)]
UDF 框架会将 TDengine 的 timestamp 类型映射为 Python 的 int 类型,所以这个函数只接受一个表示毫秒数的整数。process 方法先做参数检查,然后用 moment 包替换时间的星期为星期日,最后格式化输出。输出的字符串长度是固定的10个字符长,因此可以这样创建 UDF 函数:
create function nextsunday as '/root/udf/nextsunday.py' outputtype binary(10) language 'Python';
此时测试函数,如果你是用 systemctl 启动的 taosd,肯定会遇到错误:
taos> select ts, nextsunday(ts) from t;
DB error: udf function execution failure (1.123615s)
tail -20 taospyudf.log
2023-05-25 11:42:34.541 ERROR [1679419] [PyUdf::PyUdf@217] py udf load module failure. error ModuleNotFoundError: No module named 'moment'
这是因为 “moment” 所在位置不在 python udf 插件默认的库搜索路径中。怎么确认这一点呢?通过以下命令搜索 taospyudf.log:
grep 'sys path' taospyudf.log | tail -1
2023-05-25 10:58:48.554 INFO [1679419] [doPyOpen@592] python sys path: ['', '/lib/python38.zip', '/lib/python3.8', '/lib/python3.8/lib-dynload', '/lib/python3/dist-packages', '/var/lib/taos//.udf']
发现 python udf 插件默认搜索的第三方库安装路径是: /lib/python3/dist-packages,而 moment 默认安装到了 /usr/local/lib/python3.8/dist-packages。下面我们修改 python udf 插件默认的库搜索路径。
先打开 python3 命令行,查看当前的 sys.path
>>> import sys
>>> ":".join(sys.path)
复制上面脚本的输出的字符串,然后编辑 /var/taos/taos.cfg 加入以下配置:
UdfdLdLibPath /usr/lib/python3.8:/usr/lib/python3.8/lib-dynload:/usr/local/lib/python3.8/dist-packages:/usr/lib/python3/dist-packages
保存后执行 systemctl restart taosd, 再测试就不报错了:
taos> select ts, nextsunday(ts) from t;
ts | nextsunday(ts) |
2023-05-01 12:13:14.000 | 2023-05-07 |
2023-05-03 08:09:10.000 | 2023-05-07 |
2023-05-10 07:06:05.000 | 2023-05-14 |
2023-05-25 09:09:15.000 | 2023-05-28 |
Query OK, 4 row(s) in set (1.011474s)
#### 示例五:聚合函数
聚合函数与标量函数的区别是:标量函数是多行输入对应多个输出,聚合函数是多行输入对应一个输出。聚合函数的执行过程有点像经典的 map-reduce 框架的执行过程,框架把数据分成若干块,每个 mapper 处理一个块,reducer 再把 mapper 的结果做聚合。不一样的地方在于,对于 TDengine Python UDF 中的 reduce 函数既有 map 的功能又有 reduce 的功能。reduce 函数接受两个参数:一个是自己要处理的数据,一个是别的任务执行 reduce 函数的处理结果。如下面的示例 /root/udf/myspread.py:
import io
import math
import pickle
LOG_FILE: io.TextIOBase = None
def init():
global LOG_FILE
LOG_FILE = open("/var/log/taos/spread.log", "wt")
log("init function myspead success")
def log(o):
LOG_FILE.write(str(o) + '\n')
def destroy():
log("close log file: spread.log")
def start():
return pickle.dumps((-math.inf, math.inf))
def reduce(block, buf):
max_number, min_number = pickle.loads(buf)
log(f"initial max_number={max_number}, min_number={min_number}")
rows, _ = block.shape()
for i in range(rows):
v = block.data(i, 0)
if v > max_number:
max_number = v
if v < min_number:
min_number = v
return pickle.dumps((max_number, min_number))
def finish(buf):
max_number, min_number = pickle.loads(buf)
return max_number - min_number
1. init 函数不再是空函数,而是打开了一个文件用于写执行日志
2. log 函数是记录日志的工具,自动将传入的对象转成字符串,加换行符输出
3. destroy 函数用来在执行结束关闭文件
4. start 返回了初始的 buffer,用来存聚合函数的中间结果,我们把最大值初始化为负无穷大,最小值初始化为正无穷大
5. reduce 处理每个数据块并聚合结果
6. finish 函数将最终的 buffer 转换成最终的输出
执行下面的 SQL语句创建对应的 UDF:
create or replace aggregate function myspread as '/root/udf/myspread.py' outputtype double bufsize 128 language 'Python';
这个 SQL 语句与创建标量函数的 SQL 语句有两个重要区别:
1. 增加了 aggregate 关键字
2. 增加了 bufsize 关键字,用来指定存储中间结果的内存大小,这个数值可以大于实际使用的数值。本例中间结果是两个浮点数组成的 tuple,序列化后实际占用大小只有 32 个字节,但指定的 bufsize 是128,可以用 python 命令行打印实际占用的字节数
>>> len(pickle.dumps((12345.6789, 23456789.9877)))
测试这个函数,可以看到 myspread 的输出结果和内置的 spread 函数的输出结果是一致的。
taos> select myspread(v1) from t;
myspread(v1) |
5.000000000 |
Query OK, 1 row(s) in set (0.013486s)
taos> select spread(v1) from t;
spread(v1) |
5.000000000 |
Query OK, 1 row(s) in set (0.005501s)
最后,查看我们自己打印的执行日志,从日志可以看出,reduce 函数被执行了 3 次。执行过程中 max 值被更新了 4 次, min 值只被更新 1 次。
root@slave11 /var/log/taos $ cat spread.log
init function myspead success
initial max_number=-inf, min_number=inf
initial max_number=1, min_number=1
initial max_number=3, min_number=1
close log file: spread.log
### SQL 命令
1. 创建标量函数的语法
CREATE FUNCTION function_name AS library_path OUTPUTTYPE output_type LANGUAGE 'Python';
2. 创建聚合函数的语法
CREATE AGGREGATE FUNCTION function_name library_path OUTPUTTYPE output_type LANGUAGE 'Python';
3. 更新标量函数
4. 更新聚合函数
注意:如果加了 “AGGREGATE” 关键字,更新之后函数将被当作聚合函数,无论之前是什么类型的函数。相反,如果没有加 “AGGREGATE” 关键字,更新之后的函数将被当作标量函数,无论之前是什么类型的函数。
5. 查看函数信息
同名的 UDF 每更新一次,版本号会增加 1。
select * from ins_functions \G;
6. 查看和删除已有的 UDF
SHOW functions;
DROP FUNCTION function_name;
上面的命令可以查看 UDF 的完整信息
### Python UDF 示例代码
### 更多 Python UDF 示例代码
#### 标量函数示例 [pybitand](https://github.com/taosdata/TDengine/blob/3.0/tests/script/sh/pybitand.py)
pybitand 实现多列的按位与功能。如果只有一列,返回这一列。pybitand 忽略空值。
