@@ -270,15 +270,81 @@ select max_vol(vol1,vol2,vol3,deviceid) from battery;
## Implement a UDF in Python
### Prepare Environment
1. Prepare Python Environment
Please follow standard procedure of python environment preparation.
2. Install Python package `taospyudf`
```shell
pip3 install taospyudf
```
During this process, some C++ code needs to be compiled. So it's required to have `cmake` and `gcc` on your system. The compiled `libtaospyudf.so` will be automatically copied to `/usr/local/lib` path. If you are not root user, please use `sudo`. After installation is done, please check using the command below.
```shell
root@slave11 ~/udf $ ls-l /usr/local/lib/libtaos*
-rw-r--r-- 1 root root 671344 May 24 22:54 /usr/local/lib/libtaospyudf.so
```
Then execute the command below.
```shell
ldconfig
```
3. If you want to utilize some 3rd party python packages in your Python UDF, please set configuration parameter `UdfdLdLibPath` to the value of `PYTHONPATH` before starting `taosd`.
4. Launch `taosd` service
Please refer to [Get Started](../get-started)
### Interface definition
#### Introduction to Interface
Implement the specified interface functions when implementing a UDF in Python.
- implement `process` function for the scalar UDF.
- implement `start`, `reduce`, `finish` for the aggregate UDF.
- implement `init` for initialization and `destroy` for termination.
### Implement a Scalar UDF in Python
#### Scalar UDF Interface
The implementation of a scalar UDF is described as follows:
Description: this function prcesses datablock, which is the input; you can use datablock.data(row, col) to access the python object at location(row,col); the output is a tuple object consisted of objects of type outputtype
#### Aggregate UDF Interface
The implementation of an aggregate function is described as follows:
Description: first the start() is invoked to generate the initial result `buffer`; then the input data is divided into multiple row blocks, and reduce() is invoked for each block `inputs` and current intermediate result `buf`; finally finish() is invoked to generate the final result from intermediate `buf`, the final result can only contains 0 or 1 data.
#### Initialization and Cleanup Interface
```python
definit()
defdestroy()
```
Description: init() does the work of initialization before processing any data; destroy() does the work of cleanup after the data is processed.
-`input` is a data block two-dimension matrix-like object, of which method `data(row, col)` returns the Python object located at location (`row`, `col`)
- return a Python tuple object, of which each item is a Python object of type `output_type`
#### Aggregate Interface
```Python
def start() -> bytes:
def reduce(input: datablock, buf: bytes) -> bytes
def finish(buf: bytes) -> output_type:
```
- first `start()` is called to return the initial result in type `bytes`
- then the input data are divided into multiple data blocks and for each block `input`, `reduce` is called with the data block `input` and the current result `buf` bytes and generates a new intermediate result buffer.
- finally, the `finish` function is called on the intermediate result `buf` and outputs 0 or 1 data of type `output_type`
#### Initialization and Cleanup Interface
```Python
def init()
def destroy()
```
Implement `init` for initialization and `destroy` for termination.
Note: aggregate UDF requires init(), destroy(), start(), reduce() and finish() to be impemented. start() generates the initial result in buffer, then the input data is divided into multiple row data blocks, reduce() is invoked for each data block `inputs` and intermediate `buf`, finally finish() is invoked to generate final result from the intermediate result `buf`.
### Data Mapping between TDengine SQL and Python UDF
...
...
@@ -353,15 +393,463 @@ The following table describes the mapping between TDengine SQL data type and Pyt
|TIMESTAMP | int |
|JSON and other types | Not Supported |
### Installing Python UDF
1. Install Python package `taospyudf` that executes Python UDF
```bash
sudo pip install taospyudf
ldconfig
### Development Guide
In this section we will demonstrate 5 examples of developing UDF in Python language. In this guide, you will learn the development skills from easy case to hard case, the examples include:
1. A scalar function which accepts only one integer as input and outputs ln(n^2 + 1)。
2. A scalar function which accepts n integers, like(x1, x2, ..., xn)and output the sum of the product of each input and its sequence number, i.e. x1 + 2 * x2 + ... + n * xn。
3. A scalar function which accepts a timestamp and output the next closest Sunday of the timestamp. In this case, we will demonstrate how to use 3rd party library `moment`.
4. An aggregate function which calculates the difference between the maximum and the minimum of a specific column, i.e. same functionality of built-in spread().
In the guide, some debugging skills of using Python UDF will be explained too.
We assume you are using Linux system and already have TDengine 3.0.4.0+ and Python 3.x.
Note:**You can't use print() function to output log inside a UDF, you have to write the log to a specific file or use logging module of Python.**
#### Sample 1: Simplest UDF
This scalar UDF accepts an integer as input and output ln(n^2 + 1).
Firstly, please compose a Python source code file in your system and save it, e.g. `/root/udf/myfun.py`, the code is like below.
This program consists of 3 functions, init() and destroy() do nothing, but they have to be defined even though there is nothing to do in them because they are critical parts of a python UDF. The most important function is process(), which accepts a data block and the data block object has two methods:
1. shape() returns the number of rows and the number of columns of the data block
2. data(i, j) returns the value at (i,j) in the block
The output of the process() function of a scalar UDF returns exactly same number of data as the number of input rows. We will ignore the number of columns because we just want to compute on the first column.
Then, we create the UDF using the SQL command below.
At this time, if we pass two arguments to `myfun`, the execution would fail.
```sql
taos>selectmyfun(v1,v2)fromt;
DBerror:udffunctionexecutionfailure(0.014643s)
```
However, the exception is not shown to end user, but displayed in the log file `/var/log/taos/taospyudf.log`
```text
2023-05-24 23:21:06.790 ERROR [1666188] [doPyUdfScalarProc@507] call pyUdfScalar proc function. context 0x7faade26d180. error: Exception: require 1 parameter but given 2
At:
/var/lib/taos//.udf/myfun_3_1884e1281d9.py(12): process
```
Now, we have learned how to update a UDF and check the log of a UDF.
Note: Prior to TDengine 3.0.5.0 (excluding), updating a UDF requires to restart `taosd` service. After 3.0.5.0, restarting is not required.
#### Sample 3: UDF with n arguments
A UDF which accepts n intergers, likee (x1, x2, ..., xn) and output the sum of the product of each value and its sequence number: 1 * x1 + 2 * x2 + ... + n * xn. If there is `null` in the input, then the result is `null`. The difference from sample 1 is that it can accept any number of columns as input and process each column. Assume the program is written in /root/udf/nsum.py:
A UDF which accepts a timestamp and output the next closed Sunday. This sample requires to use third party package `moment`, you need to install it firslty.
```shell
pip3 install moment
```
Then compose the Python code in /root/udf/nextsunday.py
UDF framework will map the TDengine timestamp to Python int type, so this function only accepts an integer representing millisecond. process() firstly validates the parameters, then use `moment` to replace the time, format the result and output.
You may find that the default library search path is `/lib/python3/dist-packages` (just for example, it may be different in your system), but `moment` is installed to `/usr/local/lib/python3.8/dist-packages` (for example, it may be different in your system). Then we change the library search path of python UDF.
Check `sys.path`, which must include the packages you install with pip3 command previously, as shown below:
Save it, then restart `taosd`, using `systemctl restart taosd`, and test again, it will succeed this time.
Note: If your cluster consists of multiple `taosd` instances, you have to repeat same process for each of them.
```sql
taos>selectts,nextsunday(ts)fromt;
ts|nextsunday(ts)|
===========================================
2023-05-0112:13:14.000|2023-05-07|
2023-05-0308:09:10.000|2023-05-07|
2023-05-1007:06:05.000|2023-05-14|
2023-05-2509:09:15.000|2023-05-28|
QueryOK,4row(s)inset(1.011474s)
```
#### Sample 5: Aggregate Function
An aggregate function which calculates the difference of the maximum and the minimum in a column. An aggregate funnction takes multiple rows as input and output only one data. The execution process of an aggregate UDF is like map-reduce, the framework divides the input into multiple parts, each mapper processes one block and the reducer aggregates the result of the mappers. The reduce() of Python UDF has the functionality of both map() and reduce(). The reduce() takes two arguments: the data to be processed; and the result of other tasks executing reduce(). For exmaple, assume the code is in `/root/udf/myspread.py`.
2. If PYTHONPATH is needed to find Python packages when the Python UDF executes, include the PYTHONPATH contents into the udfdLdLibPath variable of the taos.cfg configuration file
### Python UDF Sample Code
In this example, we implemented an aggregate function, and added some logging.
1. init() opens a file for logging
2. log() is the function for logging, it converts the input object to string and output with an end of line
3. destroy() closes the log file \
4. start() returns the initial buffer for storing the intermediate result
5. reduce() processes each daa block and aggregates the result
6. finish() converts the final buffer() to final result\
This SQL command has two important different points from the command creating scalar UDF.
1. keyword `aggregate` is used
2. keyword `bufsize` is used to specify the memory size for storing the intermediate result. In this example, the result is 32 bytes, but we specified 128 bytes for `bufsize`. You can use the `python` CLI to print actual size.
```python
>>>len(pickle.dumps((12345.6789,23456789.9877)))
32
```
Test this function, you can see the result is same as built-in spread() function. \
```sql
taos>selectmyspread(v1)fromt;
myspread(v1)|
============================
5.000000000|
QueryOK,1row(s)inset(0.013486s)
taos>selectspread(v1)fromt;
spread(v1)|
============================
5.000000000|
QueryOK,1row(s)inset(0.005501s)
```
At last, check the log file, we can see that the reduce() function is executed 3 times, max value is updated 3 times and min value is updated only one time.
Note: If keyword `AGGREGATE` used, the UDF will be treated as aggregate UDF despite what it was before; Similarly, if there is no keyword `aggregate`, the UDF will be treated as scalar function despite what it was before.
5. Show the UDF
The version of a UDF is increased by one every time it's updated.
```sql
select*fromins_functions\G;
```
6. Show and Drop existing UDF
```sql
SHOWfunctions;
DROPFUNCTIONfunction_name;
```
### More Python UDF Samples
#### Scalar Function [pybitand](https://github.com/taosdata/TDengine/blob/3.0/tests/script/sh/pybitand.py)
The `pybitand` function implements bitwise addition for multiple columns. If there is only one column, the column is returned. The `pybitand` function ignores null values.