提交 b396cbd3 编写于 作者: K kailixu

Merge branch '3.0' into enh/TD-21161-3.0

......@@ -6,10 +6,12 @@ description: This document describes how to create user-defined functions (UDF),
The built-in functions of TDengine may not be sufficient for the use cases of every application. In this case, you can define custom functions for use in TDengine queries. These are known as user-defined functions (UDF). A user-defined function takes one column of data or the result of a subquery as its input.
TDengine supports user-defined functions written in C or C++. This document describes the usage of user-defined functions.
User-defined functions can be scalar functions or aggregate functions. Scalar functions, such as `abs`, `sin`, and `concat`, output a value for every row of data. Aggregate functions, such as `avg` and `max` output one value for multiple rows of data.
TDengine supports user-defined functions written in C or Python. This document describes the usage of user-defined functions.
## Implement a UDF in C
When you create a user-defined function, you must implement standard interface functions:
- For scalar functions, implement the `scalarfn` interface function.
- For aggregate functions, implement the `aggfn_start`, `aggfn`, and `aggfn_finish` interface functions.
......@@ -17,7 +19,7 @@ When you create a user-defined function, you must implement standard interface f
There are strict naming conventions for these interface functions. The names of the start, finish, init, and destroy interfaces must be <udf-name\>_start, <udf-name\>_finish, <udf-name\>_init, and <udf-name\>_destroy, respectively. Replace `scalarfn`, `aggfn`, and `udf` with the name of your user-defined function.
## Implementing a Scalar Function
### Implementing a Scalar Function in C
The implementation of a scalar function is described as follows:
```c
#include "taos.h"
......@@ -49,7 +51,7 @@ int32_t scalarfn_destroy() {
```
Replace `scalarfn` with the name of your function.
## Implementing an Aggregate Function
### Implementing an Aggregate Function in C
The implementation of an aggregate function is described as follows:
```c
......@@ -100,7 +102,7 @@ int32_t aggfn_destroy() {
```
Replace `aggfn` with the name of your function.
## Interface Functions
### UDF Interface Definition in C
There are strict naming conventions for interface functions. The names of the start, finish, init, and destroy interfaces must be <udf-name\>_start, <udf-name\>_finish, <udf-name\>_init, and <udf-name\>_destroy, respectively. Replace `scalarfn`, `aggfn`, and `udf` with the name of your user-defined function.
......@@ -108,8 +110,7 @@ Interface functions return a value that indicates whether the operation was succ
For information about the parameters for interface functions, see Data Model
### Interfaces for Scalar Functions
#### Scalar Interface
`int32_t scalarfn(SUdfDataBlock* inputDataBlock, SUdfColumn *resultColumn)`
Replace `scalarfn` with the name of your function. This function performs scalar calculations on data blocks. You can configure a value through the parameters in the `resultColumn` structure.
......@@ -118,7 +119,7 @@ The parameters in the function are defined as follows:
- inputDataBlock: The data block to input.
- resultColumn: The column to output. The column to output.
### Interfaces for Aggregate Functions
#### Aggregate Interface
`int32_t aggfn_start(SUdfInterBuf *interBuf)`
......@@ -126,7 +127,7 @@ The parameters in the function are defined as follows:
`int32_t aggfn_finish(SUdfInterBuf* interBuf, SUdfInterBuf *result)`
Replace `aggfn` with the name of your function. In the function, aggfn_start is called to generate a result buffer. Data is then divided between multiple blocks, and aggfn is called on each block to update the result. Finally, aggfn_finish is called to generate final results from the intermediate results. The final result contains only one or zero data points.
Replace `aggfn` with the name of your function. In the function, aggfn_start is called to generate a result buffer. Data is then divided between multiple blocks, and the `aggfn` function is called on each block to update the result. Finally, aggfn_finish is called to generate the final results from the intermediate results. The final result contains only one or zero data points.
The parameters in the function are defined as follows:
- interBuf: The intermediate result buffer.
......@@ -135,15 +136,15 @@ The parameters in the function are defined as follows:
- result: The final result.
### Initializing and Terminating User-Defined Functions
#### Initialization and Cleanup Interface
`int32_t udf_init()`
`int32_t udf_destroy()`
Replace `udf`with the name of your function. udf_init initializes the function. udf_destroy terminates the function. If it is not necessary to initialize your function, udf_init is not required. If it is not necessary to terminate your function, udf_destroy is not required.
Replace `udf` with the name of your function. udf_init initializes the function. udf_destroy terminates the function. If it is not necessary to initialize your function, udf_init is not required. If it is not necessary to terminate your function, udf_destroy is not required.
## Data Structure of User-Defined Functions
### Data Structures for UDF in C
```c
typedef struct SUdfColumnMeta {
int16_t type;
......@@ -193,7 +194,7 @@ typedef struct SUdfInterBuf {
```
The data structure is described as follows:
- The SUdfDataBlock block includes the number of rows (numOfRows) and number of columns (numCols). udfCols[i] (0 <= i <= numCols-1) indicates that each column is of type SUdfColumn.
- The SUdfDataBlock block includes the number of rows (numOfRows) and the number of columns (numCols). udfCols[i] (0 <= i <= numCols-1) indicates that each column is of type SUdfColumn.
- SUdfColumn includes the definition of the data type of the column (colMeta) and the data in the column (colData).
- The member definitions of SUdfColumnMeta are the same as the data type definitions in `taos.h`.
- The data in SUdfColumnData can become longer. varLenCol indicates variable-length data, and fixLenCol indicates fixed-length data.
......@@ -201,9 +202,9 @@ The data structure is described as follows:
Additional functions are defined in `taosudf.h` to make it easier to work with these structures.
## Compile UDF
### Compiling C UDF
To use your user-defined function in TDengine, first compile it to a dynamically linked library (DLL).
To use your user-defined function in TDengine, first, compile it to a shared library.
For example, the sample UDF `bit_and.c` can be compiled into a DLL as follows:
......@@ -213,12 +214,9 @@ gcc -g -O0 -fPIC -shared bit_and.c -o libbitand.so
The generated DLL file `libbitand.so` can now be used to implement your function. Note: GCC 7.5 or later is required.
## Manage and Use User-Defined Functions
After compiling your function into a DLL, you add it to TDengine. For more information, see [User-Defined Functions](../12-taos-sql/26-udf.md).
### UDF Sample Code in C
## Sample Code
### Sample scalar function: [bit_and](https://github.com/taosdata/TDengine/blob/3.0/tests/script/sh/bit_and.c)
#### Scalar function: [bit_and](https://github.com/taosdata/TDengine/blob/3.0/tests/script/sh/bit_and.c)
The bit_and function implements bitwise addition for multiple columns. If there is only one column, the column is returned. The bit_and function ignores null values.
......@@ -231,7 +229,7 @@ The bit_and function implements bitwise addition for multiple columns. If there
</details>
### Sample aggregate function: [l2norm](https://github.com/taosdata/TDengine/blob/3.0/tests/script/sh/l2norm.c)
#### Aggregate function 1: [l2norm](https://github.com/taosdata/TDengine/blob/3.0/tests/script/sh/l2norm.c)
The l2norm function finds the second-order norm for all data in the input column. This squares the values, takes a cumulative sum, and finds the square root.
......@@ -243,3 +241,151 @@ The l2norm function finds the second-order norm for all data in the input column
```
</details>
#### Aggregate function 2: [max_vol](https://github.com/taosdata/TDengine/blob/3.0/tests/script/sh/max_vol.c)
The max_vol function returns a string concatenating the deviceId column, the row number and column number of the maximum voltage and the maximum voltage given several voltage columns as input.
Create Table:
```bash
create table battery(ts timestamp, vol1 float, vol2 float, vol3 float, deviceId varchar(16));
```
Create the UDF:
```bash
create aggregate function max_vol as '/root/udf/libmaxvol.so' outputtype binary(64) bufsize 10240 language 'C';
```
Use the UDF in the query:
```bash
select max_vol(vol1,vol2,vol3,deviceid) from battery;
```
<details>
<summary>max_vol.c</summary>
```c
{{#include tests/script/sh/max_vol.c}}
```
</details>
## Implement a UDF in Python
Implement the specified interface functions when implementing a UDF in Python.
- implement `process` function for the scalar UDF。
- implement `start`, `reduce`, `finish` for the aggregate UDF。
- implement `init` for initialization and `destroy` for termination。
### Implement a Scalar UDF in Python
The implementation of a scalar UDF is described as follows:
```Python
def init():
# initialization
def destroy():
# destroy
def process(input: datablock) -> tuple[output_type]:
# process input datablock,
# datablock.data(row, col) is to access the python object in location(row,col)
# return tuple object consisted of object of type outputtype
```
### Implement an Aggregate UDF in Python
The implementation of an aggregate function is described as follows:
```Python
def init():
#initialization
def destroy():
#destroy
def start() -> bytes:
#return serialize(init_state)
def reduce(inputs: datablock, buf: bytes) -> bytes
# deserialize buf to state
# reduce the inputs and state into new_state.
# use inputs.data(i,j) to access python ojbect of location(i,j)
# serialize new_state into new_state_bytes
return new_state_bytes
def finish(buf: bytes) -> output_type:
#return obj of type outputtype
```
### Python UDF Interface Definition
#### Scalar interface
```Python
def process(input: datablock) -> tuple[output_type]:
```
- `input` is a data block two-dimension matrix-like object, of which method `data(row, col)` returns the Python object located at location (`row`, `col`)
- return a Python tuple object, of which each item is a Python object of type `output_type`
#### Aggregate Interface
```Python
def start() -> bytes:
def reduce(input: datablock, buf: bytes) -> bytes
def finish(buf: bytes) -> output_type:
```
- first `start()` is called to return the initial result in type `bytes`
- then the input data are divided into multiple data blocks and for each block `input`, `reduce` is called with the data block `input` and the current result `buf` bytes and generates a new intermediate result buffer.
- finally, the `finish` function is called on the intermediate result `buf` and outputs 0 or 1 data of type `output_type`
#### Initialization and Cleanup Interface
```Python
def init()
def destroy()
```
Implement `init` for initialization and `destroy` for termination.
### Data Mapping between TDengine SQL and Python UDF
The following table describes the mapping between TDengine SQL data type and Python UDF Data Type. The `NULL` value of all TDengine SQL types is mapped to the `None` value in Python.
| **TDengine SQL Data Type** | **Python Data Type** |
| :-----------------------: | ------------ |
|TINYINT / SMALLINT / INT / BIGINT | int |
|TINYINT UNSIGNED / SMALLINT UNSIGNED / INT UNSIGNED / BIGINT UNSIGNED | int |
|FLOAT / DOUBLE | float |
|BOOL | bool |
|BINARY / VARCHAR / NCHAR | bytes|
|TIMESTAMP | int |
|JSON and other types | Not Supported |
### Installing Python UDF
1. Install Python package `taospyudf` that executes Python UDF
```bash
sudo pip install taospyudf
ldconfig
```
2. If PYTHONPATH is needed to find Python packages when the Python UDF executes, include the PYTHONPATH contents into the udfdLdLibPath variable of the taos.cfg configuration file
### Python UDF Sample Code
#### Scalar Function [pybitand](https://github.com/taosdata/TDengine/blob/3.0/tests/script/sh/pybitand.py)
The `pybitand` function implements bitwise addition for multiple columns. If there is only one column, the column is returned. The `pybitand` function ignores null values.
<details>
<summary>pybitand.py</summary>
```Python
{{#include tests/script/sh/pybitand.py}}
```
</details>
#### Aggregate Function [pyl2norm](https://github.com/taosdata/TDengine/blob/3.0/tests/script/sh/pyl2norm.py)
The `pyl2norm` function finds the second-order norm for all data in the input column. This squares the values, takes a cumulative sum, and finds the square root.
<details>
<summary>pyl2norm.py</summary>
```c
{{#include tests/script/sh/pyl2norm.py}}
```
</details>
## Manage and Use UDF
You need to add UDF to TDengine before using it in SQL queries. For more information about how to manage UDF and how to invoke UDF, please see [Manage and Use UDF](../12-taos-sql/26-udf.md).
......@@ -72,8 +72,8 @@ database_option: {
- 0: The database can contain multiple supertables.
- 1: The database can contain only one supertable.
- STT_TRIGGER: specifies the number of file merges triggered by flushed files. The default is 8, ranging from 1 to 16. For high-frequency scenarios with few tables, it is recommended to use the default configuration or a smaller value for this parameter; For multi-table low-frequency scenarios, it is recommended to configure this parameter with a larger value.
- TABLE_PREFIX:The prefix length in the table name that is ignored when distributing table to vnode based on table name.
- TABLE_SUFFIX:The suffix length in the table name that is ignored when distributing table to vnode based on table name.
- TABLE_PREFIX: The prefix in the table name that is ignored when distributing a table to a vgroup when it's a positive number, or only the prefix is used when distributing a table to a vgroup, the default value is 0; For example, if the table name v30001, then "0001" is used if TSDB_PREFIX is set to 2 but "v3" is used if TSDB_PREFIX is set to -2; It can help you to control the distribution of tables.
- TABLE_SUFFIX:The suffix in the table name that is ignored when distributing a table to a vgroup when it's a positive number, or only the suffix is used when distributing a table to a vgroup, the default value is 0; For example, if the table name v30001, then "v300" is used if TSDB_SUFFIX is set to 2 but "01" is used if TSDB_SUFFIX is set to -2; It can help you to control the distribution of tables.
- TSDB_PAGESIZE: The page size of the data storage engine in a vnode. The unit is KB. The default is 4 KB. The range is 1 to 16384, that is, 1 KB to 16 MB.
- WAL_RETENTION_PERIOD: specifies the maximum time of which WAL files are to be kept for consumption. This parameter is used for data subscription. Enter a time in seconds. The default value 0. A value of 0 indicates that WAL files are not required to keep for consumption. Alter it with a proper value at first to create topics.
- WAL_RETENTION_SIZE: specifies the maximum total size of which WAL files are to be kept for consumption. This parameter is used for data subscription. Enter a size in KB. The default value is 0. A value of 0 indicates that the total size of WAL files to keep for consumption has no upper limit.
......
......@@ -120,6 +120,9 @@ Provides information about user-defined functions.
| 5 | create_time | TIMESTAMP | Creation time |
| 6 | code_len | INT | Length of the source code |
| 7 | bufsize | INT | Buffer size |
| 8 | func_language | BINARY(31) | UDF programming language |
| 9 | func_body | BINARY(16384) | UDF function body |
| 10 | func_version | INT | UDF function version. starting from 0. Increasing by 1 each time it is updated|
## INS_INDEXES
......
......@@ -7,17 +7,18 @@ description: This document describes the SQL statements related to user-defined
You can create user-defined functions and import them into TDengine.
## Create UDF
SQL command can be executed on the host where the generated UDF DLL resides to load the UDF DLL into TDengine. This operation cannot be done through REST interface or web console. Once created, any client of the current TDengine can use these UDF functions in their SQL commands. UDF are stored in the management node of TDengine. The UDFs loaded in TDengine would be still available after TDengine is restarted.
SQL command can be executed on the host where the generated UDF DLL resides to load the UDF DLL into TDengine. This operation cannot be done through REST interface or web console. Once created, any client of the current TDengine can use these UDF functions in their SQL commands. UDF is stored in the management node of TDengine. The UDFs loaded in TDengine would be still available after TDengine is restarted.
When creating UDF, the type of UDF, i.e. a scalar function or aggregate function must be specified. If the specified type is wrong, the SQL statements using the function would fail with errors. The input data type and output data type must be consistent with the UDF definition.
- Create Scalar Function
```sql
CREATE FUNCTION function_name AS library_path OUTPUTTYPE output_type;
CREATE [OR REPLACE] FUNCTION function_name AS library_path OUTPUTTYPE output_type [LANGUAGE 'C|Python'];
```
- function_name: The scalar function name to be used in SQL statement which must be consistent with the UDF name and is also the name of the compiled DLL (.so file).
- library_path: The absolute path of the DLL file including the name of the shared object file (.so). The path must be quoted with single or double quotes.
- OR REPLACE: if the UDF exists, the UDF properties are modified
- function_name: The scalar function name to be used in the SQL statement
- LANGUAGE 'C|Python': the programming language of UDF. Now C or Python is supported. If this clause is omitted, C is assumed as the programming language.
- library_path: For C programming language, The absolute path of the DLL file including the name of the shared object file (.so). For Python programming language, the absolute path of the Python UDF script. The path must be quoted with single or double quotes.
- output_type: The data type of the results of the UDF.
For example, the following SQL statement can be used to create a UDF from `libbitand.so`.
......@@ -25,14 +26,20 @@ CREATE FUNCTION function_name AS library_path OUTPUTTYPE output_type;
```sql
CREATE FUNCTION bit_and AS "/home/taos/udf_example/libbitand.so" OUTPUTTYPE INT;
```
For Example, the following SQL statement can be used to modify the existing function `bit_and`. The OUTPUT type is changed to BIGINT and the programming language is changed to Python.
```sql
CREATE OR REPLACE FUNCTION bit_and AS "/home/taos/udf_example/bit_and.py" OUTPUTTYPE BIGINT LANGUAGE 'Python';
```
- Create Aggregate Function
```sql
CREATE AGGREGATE FUNCTION function_name AS library_path OUTPUTTYPE output_type [ BUFSIZE buffer_size ];
```
- function_name: The aggregate function name to be used in SQL statement which must be consistent with the udfNormalFunc name and is also the name of the compiled DLL (.so file).
- library_path: The absolute path of the DLL file including the name of the shared object file (.so). The path must be quoted with single or double quotes.
- OR REPLACE: if the UDF exists, the UDF properties are modified
- function_name: The aggregate function name to be used in the SQL statement
- LANGUAGE 'C|Python': the programming language of the UDF. Now C or Python is supported. If this clause is omitted, C is assumed as the programming language.
- library_path: For C programming language, The absolute path of the DLL file including the name of the shared object file (.so). For Python programming language, the absolute path of the Python UDF script. The path must be quoted with single or double quotes.
- output_type: The output data type, the value is the literal string of the supported TDengine data type.
- buffer_size: The size of the intermediate buffer in bytes. This parameter is optional.
......@@ -41,6 +48,11 @@ CREATE AGGREGATE FUNCTION function_name AS library_path OUTPUTTYPE output_type [
```sql
CREATE AGGREGATE FUNCTION l2norm AS "/home/taos/udf_example/libl2norm.so" OUTPUTTYPE DOUBLE bufsize 8;
```
For example, the following SQL statement modifies the buffer size of existing UDF `l2norm` to 64
```sql
CREATE AGGREGATE FUNCTION l2norm AS "/home/taos/udf_example/libl2norm.so" OUTPUTTYPE DOUBLE bufsize 64;
```
For more information about user-defined functions, see [User-Defined Functions](/develop/udf).
## Manage UDF
......@@ -61,9 +73,9 @@ SHOW FUNCTIONS;
## Call UDF
The function name specified when creating UDF can be used directly in SQL statements, just like builtin functions. For example:
The function name specified when creating UDF can be used directly in SQL statements, just like built-in functions. For example:
```sql
SELECT bit_and(c1,c2) FROM table;
```
The above SQL statement invokes function X for column c1 and c2 on table. You can use query keywords like WHERE with user-defined functions.
The above SQL statement invokes function X for columns c1 and c2 on the table. You can use query keywords like WHERE with user-defined functions.
......@@ -6,18 +6,20 @@ description: "支持用户编码的聚合函数和标量函数,在查询中嵌
在有些应用场景中,应用逻辑需要的查询无法直接使用系统内置的函数来表示。利用 UDF(User Defined Function) 功能,TDengine 可以插入用户编写的处理代码并在查询中使用它们,就能够很方便地解决特殊应用场景中的使用需求。 UDF 通常以数据表中的一列数据做为输入,同时支持以嵌套子查询的结果作为输入。
TDengine 支持通过 C/C++ 语言进行 UDF 定义。接下来结合示例讲解 UDF 的使用方法。
用户可以通过 UDF 实现两类函数:标量函数和聚合函数。标量函数对每行数据输出一个值,如求绝对值 abs,正弦函数 sin,字符串拼接函数 concat 等。聚合函数对多行数据进行输出一个值,如求平均数 avg,最大值 max 等。
实现 UDF 时,需要实现规定的接口函数
TDengine 支持通过 C/Python 语言进行 UDF 定义。接下来结合示例讲解 UDF 的使用方法。
## 用 C 语言实现 UDF
使用 C 语言实现 UDF 时,需要实现规定的接口函数
- 标量函数需要实现标量接口函数 scalarfn 。
- 聚合函数需要实现聚合接口函数 aggfn_start , aggfn , aggfn_finish。
- 如果需要初始化,实现 udf_init;如果需要清理工作,实现udf_destroy。
接口函数的名称是 UDF 名称,或者是 UDF 名称和特定后缀(_start, _finish, _init, _destroy)的连接。列表中的scalarfn,aggfn, udf需要替换成udf函数名。
## 实现标量函数
### 用 C 语言实现标量函数
标量函数实现模板如下
```c
#include "taos.h"
......@@ -49,7 +51,7 @@ int32_t scalarfn_destroy() {
```
scalarfn 为函数名的占位符,需要替换成函数名,如bit_and。
## 实现聚合函数
### 用 C 语言实现聚合函数
聚合函数的实现模板如下
```c
......@@ -100,7 +102,7 @@ int32_t aggfn_destroy() {
```
aggfn为函数名的占位符,需要修改为自己的函数名,如l2norm。
## 接口函数定义
### C 语言 UDF 接口函数定义
接口函数的名称是 udf 名称,或者是 udf 名称和特定后缀(_start, _finish, _init, _destroy)的连接。以下描述中函数名称中的 scalarfn,aggfn, udf 需要替换成udf函数名。
......@@ -108,7 +110,7 @@ aggfn为函数名的占位符,需要修改为自己的函数名,如l2norm。
接口函数参数类型见数据结构定义。
### 标量接口函数
#### 标量函数接口
`int32_t scalarfn(SUdfDataBlock* inputDataBlock, SUdfColumn *resultColumn)`
......@@ -118,7 +120,7 @@ aggfn为函数名的占位符,需要修改为自己的函数名,如l2norm。
- inputDataBlock: 输入的数据块
- resultColumn: 输出列
### 聚合接口函数
#### 聚合函数接口
`int32_t aggfn_start(SUdfInterBuf *interBuf)`
......@@ -135,7 +137,7 @@ aggfn为函数名的占位符,需要修改为自己的函数名,如l2norm。
- result:最终结果。
### UDF 初始化和销毁
#### 初始化和销毁接口
`int32_t udf_init()`
`int32_t udf_destroy()`
......@@ -143,7 +145,7 @@ aggfn为函数名的占位符,需要修改为自己的函数名,如l2norm。
其中 udf 是函数名的占位符。udf_init 完成初始化工作。 udf_destroy 完成清理工作。如果没有初始化工作,无需定义udf_init函数。如果没有清理工作,无需定义udf_destroy函数。
## UDF 数据结构
### C 语言 UDF 数据结构
```c
typedef struct SUdfColumnMeta {
int16_t type;
......@@ -201,7 +203,7 @@ typedef struct SUdfInterBuf {
为了更好的操作以上数据结构,提供了一些便利函数,定义在 taosudf.h。
## 编译 UDF
### 编译 C UDF
用户定义函数的 C 语言源代码无法直接被 TDengine 系统使用,而是需要先编译为 动态链接库,之后才能载入 TDengine 系统。
......@@ -213,12 +215,9 @@ gcc -g -O0 -fPIC -shared bit_and.c -o libbitand.so
这样就准备好了动态链接库 libbitand.so 文件,可以供后文创建 UDF 时使用了。为了保证可靠的系统运行,编译器 GCC 推荐使用 7.5 及以上版本。
## 管理和使用UDF
编译好的UDF,还需要将其加入到系统才能被正常的SQL调用。关于如何管理和使用UDF,参见[UDF使用说明](../12-taos-sql/26-udf.md)
### C UDF 示例代码
## 示例代码
### 标量函数示例 [bit_and](https://github.com/taosdata/TDengine/blob/develop/tests/script/sh/bit_and.c)
#### 标量函数示例 [bit_and](https://github.com/taosdata/TDengine/blob/develop/tests/script/sh/bit_and.c)
bit_add 实现多列的按位与功能。如果只有一列,返回这一列。bit_add 忽略空值。
......@@ -231,7 +230,7 @@ bit_add 实现多列的按位与功能。如果只有一列,返回这一列。
</details>
### 聚合函数示例1 返回值为数值类型 [l2norm](https://github.com/taosdata/TDengine/blob/develop/tests/script/sh/l2norm.c)
#### 聚合函数示例1 返回值为数值类型 [l2norm](https://github.com/taosdata/TDengine/blob/develop/tests/script/sh/l2norm.c)
l2norm 实现了输入列的所有数据的二阶范数,即对每个数据先平方,再累加求和,最后开方。
......@@ -244,7 +243,7 @@ l2norm 实现了输入列的所有数据的二阶范数,即对每个数据先
</details>
### 聚合函数示例2 返回值为字符串类型 [max_vol](https://github.com/taosdata/TDengine/blob/develop/tests/script/sh/max_vol.c)
#### 聚合函数示例2 返回值为字符串类型 [max_vol](https://github.com/taosdata/TDengine/blob/develop/tests/script/sh/max_vol.c)
max_vol 实现了从多个输入的电压列中找到最大电压,返回由设备ID + 最大电压所在(行,列)+ 最大电压值 组成的组合字符串值
......@@ -269,3 +268,124 @@ select max_vol(vol1,vol2,vol3,deviceid) from battery;
```
</details>
## 用 Python 语言实现 UDF
使用 Python 语言实现 UDF 时,需要实现规定的接口函数
- 标量函数需要实现标量接口函数 process 。
- 聚合函数需要实现聚合接口函数 start ,reduce ,finish。
- 如果需要初始化,实现 init;如果需要清理工作,实现 destroy。
### 用 Python 实现标量函数
标量函数实现模版如下
```Python
def init():
# initialization
def destroy():
# destroy
def process(input: datablock) -> tuple[output_type]:
# process input datablock,
# datablock.data(row, col) is to access the python object in location(row,col)
# return tuple object consisted of object of type outputtype
```
### 用 Python 实现聚合函数
聚合函数实现模版如下
```Python
def init():
#initialization
def destroy():
#destroy
def start() -> bytes:
#return serialize(init_state)
def reduce(inputs: datablock, buf: bytes) -> bytes
# deserialize buf to state
# reduce the inputs and state into new_state.
# use inputs.data(i,j) to access python ojbect of location(i,j)
# serialize new_state into new_state_bytes
return new_state_bytes
def finish(buf: bytes) -> output_type:
#return obj of type outputtype
```
### Python UDF 接口函数定义
#### 标量函数接口
```Python
def process(input: datablock) -> tuple[output_type]:
```
- input:datablock 类似二维矩阵,通过成员方法 data(row,col)返回位于 row 行,col 列的 python 对象
- 返回值是一个 Python 对象元组,每个元素类型为输出类型。
#### 聚合函数接口
```Python
def start() -> bytes:
def reduce(inputs: datablock, buf: bytes) -> bytes
def finish(buf: bytes) -> output_type:
```
首先调用 start 生成最初结果 buffer,然后输入数据会被分为多个行数据块,对每个数据块 inputs 和当前中间结果 buf 调用 reduce,得到新的中间结果,最后再调用 finish 从中间结果 buf 产生最终输出,最终输出只能含 0 或 1 条数据。
#### 初始化和销毁接口
```Python
def init()
def destroy()
```
其中 init 完成初始化工作。 destroy 完成清理工作。如果没有初始化工作,无需定义 init 函数。如果没有清理工作,无需定义 destroy 函数。
### Python 和 TDengine之间的数据类型映射
下表描述了TDengine SQL数据类型和Python数据类型的映射。任何类型的NULL值都映射成Python的None值。
| **TDengine SQL数据类型** | **Python数据类型** |
| :-----------------------: | ------------ |
|TINYINT / SMALLINT / INT / BIGINT | int |
|TINYINT UNSIGNED / SMALLINT UNSIGNED / INT UNSIGNED / BIGINT UNSIGNED | int |
|FLOAT / DOUBLE | float |
|BOOL | bool |
|BINARY / VARCHAR / NCHAR | bytes|
|TIMESTAMP | int |
|JSON and other types | 不支持 |
### Python UDF 环境的安装
1. 安装 taospyudf 包。此包执行Python UDF程序。
```bash
sudo pip install taospyudf
ldconfig
```
2. 如果 Python UDF 程序执行时,通过 PYTHONPATH 引用其它的包,可以设置 taos.cfg 的 UdfdLdLibPath 变量为PYTHONPATH的内容
### Python UDF 示例代码
#### 标量函数示例 [pybitand](https://github.com/taosdata/TDengine/blob/3.0/tests/script/sh/pybitand.py)
pybitand 实现多列的按位与功能。如果只有一列,返回这一列。pybitand 忽略空值。
<details>
<summary>pybitand.py</summary>
```Python
{{#include tests/script/sh/pybitand.py}}
```
</details>
#### 聚合函数示例 [pyl2norm](https://github.com/taosdata/TDengine/blob/3.0/tests/script/sh/pyl2norm.py)
pyl2norm 实现了输入列的所有数据的二阶范数,即对每个数据先平方,再累加求和,最后开方。
<details>
<summary>pyl2norm.py</summary>
```c
{{#include tests/script/sh/pyl2norm.py}}
```
</details>
## 管理和使用 UDF
在使用 UDF 之前需要先将其加入到 TDengine 系统中。关于如何管理和使用 UDF,请参考[管理和使用 UDF](../12-taos-sql/26-udf.md)
......@@ -71,8 +71,8 @@ database_option: {
- 0:表示可以创建多张超级表。
- 1:表示只可以创建一张超级表。
- STT_TRIGGER:表示落盘文件触发文件合并的个数。默认为 1,范围 1 到 16。对于少表高频场景,此参数建议使用默认配置,或较小的值;而对于多表低频场景,此参数建议配置较大的值。
- TABLE_PREFIX:内部存储引擎根据表名分配存储该表数据的 VNODE 时要忽略的前缀的长度。
- TABLE_SUFFIX:内部存储引擎根据表名分配存储该表数据的 VNODE 时要忽略的后缀的长度
- TABLE_PREFIX:当其为正值时,在决定把一个表分配到哪个 vgroup 时要忽略表名中指定长度的前缀;当其为负值时,在决定把一个表分配到哪个 vgroup 时只使用表名中指定长度的前缀;例如,假定表名为 "v30001",当 TSDB_PREFIX = 2 时 使用 "0001" 来决定分配到哪个 vgroup ,当 TSDB_PREFIX = -2 时使用 "v3" 来决定分配到哪个 vgroup
- TABLE_SUFFIX:当其为正值时,在决定把一个表分配到哪个 vgroup 时要忽略表名中指定长度的后缀;当其为负值时,在决定把一个表分配到哪个 vgroup 时只使用表名中指定长度的后缀;例如,假定表名为 "v30001",当 TSDB_SUFFIX = 2 时 使用 "v300" 来决定分配到哪个 vgroup ,当 TSDB_SUFFIX = -2 时使用 "01" 来决定分配到哪个 vgroup
- TSDB_PAGESIZE:一个 VNODE 中时序数据存储引擎的页大小,单位为 KB,默认为 4 KB。范围为 1 到 16384,即 1 KB到 16 MB。
- WAL_RETENTION_PERIOD: 为了数据订阅消费,需要WAL日志文件额外保留的最大时长策略。WAL日志清理,不受订阅客户端消费状态影响。单位为 s。默认为 0,表示无需为订阅保留。新建订阅,应先设置恰当的时长策略。
- WAL_RETENTION_SIZE:为了数据订阅消费,需要WAL日志文件额外保留的最大累计大小策略。单位为 KB。默认为 0,表示累计大小无上限。
......
......@@ -120,6 +120,10 @@ TDengine 内置了一个名为 `INFORMATION_SCHEMA` 的数据库,提供对数
| 5 | create_time | TIMESTAMP | 创建时间 |
| 6 | code_len | INT | 代码长度 |
| 7 | bufsize | INT | buffer 大小 |
| 8 | func_language | BINARY(31) | 自定义函数编程语言 |
| 9 | func_body | BINARY(16384) | 函数体定义 |
| 10 | func_version | INT | 函数版本号。初始版本为0,每次替换更新,版本号加1。|
## INS_INDEXES
......
......@@ -13,27 +13,34 @@ description: 使用 UDF 的详细指南
- 创建标量函数
```sql
CREATE FUNCTION function_name AS library_path OUTPUTTYPE output_type;
CREATE [OR REPLACE] FUNCTION function_name AS library_path OUTPUTTYPE output_type [LANGUAGE 'C|Python'];
```
- function_name:标量函数未来在 SQL 中被调用时的函数名,必须与函数实现中 udf 的实际名称一致;
- library_path:包含 UDF 函数实现的动态链接库的库文件绝对路径(指的是库文件在当前客户端所在主机上的保存路径,通常是指向一个 .so 文件),这个路径需要用英文单引号或英文双引号括起来;
- OR REPLACE: 如果函数已经存在,会修改已有的函数属性。
- function_name:标量函数未来在 SQL 中被调用时的函数名;
- LANGUAGE 'C|Python':函数编程语言,目前支持C语言和Python语言。 如果这个从句忽略,编程语言是C语言
- library_path:如果编程语言是C,路径是包含 UDF 函数实现的动态链接库的库文件绝对路径(指的是库文件在当前客户端所在主机上的保存路径,通常是指向一个 .so 文件)。如果编程语言是Python,路径是包含 UDF 函数实现的Python文件路径。这个路径需要用英文单引号或英文双引号括起来;
- output_type:此函数计算结果的数据类型名称;
例如,如下语句可以把 libbitand.so 创建为系统中可用的 UDF:
例如,如下语句可以把 libbitand.so 创建为系统中可用的 UDF:
```sql
CREATE FUNCTION bit_and AS "/home/taos/udf_example/libbitand.so" OUTPUTTYPE INT;
```
例如,使用以下语句可以修改已经定义的 bit_and 函数,输出类型是 BIGINT,使用Python语言实现。
```sql
CREATE OR REPLACE FUNCTION bit_and AS "/home/taos/udf_example/bit_and.py" OUTPUTTYPE BIGINT LANGUAGE 'Python';
```
- 创建聚合函数:
```sql
CREATE AGGREGATE FUNCTION function_name AS library_path OUTPUTTYPE output_type [ BUFSIZE buffer_size ];
CREATE [OR REPLACE] AGGREGATE FUNCTION function_name AS library_path OUTPUTTYPE output_type [ BUFSIZE buffer_size ] [LANGUAGE 'C|Python'];
```
- OR REPLACE: 如果函数已经存在,会修改已有的函数属性。
- function_name:聚合函数未来在 SQL 中被调用时的函数名,必须与函数实现中 udfNormalFunc 的实际名称一致;
- library_path:包含 UDF 函数实现的动态链接库的库文件绝对路径(指的是库文件在当前客户端所在主机上的保存路径,通常是指向一个 .so 文件),这个路径需要用英文单引号或英文双引号括起来;
- output_type:此函数计算结果的数据类型,与上文中 udfNormalFunc 的 itype 参数不同,这里不是使用数字表示法,而是直接写类型名称即可;
- LANGUAGE 'C|Python':函数编程语言,目前支持C语言和Python语言。
- library_path:如果编程语言是C,路径是包含 UDF 函数实现的动态链接库的库文件绝对路径(指的是库文件在当前客户端所在主机上的保存路径,通常是指向一个 .so 文件)。如果编程语言是Python,路径是包含 UDF 函数实现的Python文件路径。这个路径需要用英文单引号或英文双引号括起来;;
- output_type:此函数计算结果的数据类型名称;
- buffer_size:中间计算结果的缓冲区大小,单位是字节。如果不使用可以不设置。
例如,如下语句可以把 libl2norm.so 创建为系统中可用的 UDF:
......@@ -41,6 +48,11 @@ CREATE AGGREGATE FUNCTION function_name AS library_path OUTPUTTYPE output_type [
```sql
CREATE AGGREGATE FUNCTION l2norm AS "/home/taos/udf_example/libl2norm.so" OUTPUTTYPE DOUBLE bufsize 8;
```
例如,使用以下语句可以修改已经定义的 l2norm 函数的缓冲区大小为64。
```sql
CREATE AGGREGATE FUNCTION l2norm AS "/home/taos/udf_example/libl2norm.so" OUTPUTTYPE DOUBLE bufsize 64;
```
关于如何开发自定义函数,请参考 [UDF使用说明](/develop/udf)
## 管理 UDF
......
......@@ -27,7 +27,7 @@ description: "TDengine 3.0 版本的语法变更说明"
| - | :------- | :-------- | :------- |
| 1 | ALTER ACCOUNT | 废除 | 2.x中为企业版功能,3.0不再支持。语法暂时保留了,执行报“This statement is no longer supported”错误。
| 2 | ALTER ALL DNODES | 新增 | 修改所有DNODE的参数。
| 3 | ALTER DATABASE | 调整 | <p>废除</p><ul><li>QUORUM:写入需要的副本确认数。3.0 版本默认行为是强一致性,且不支持修改为弱一致性。</li><li>BLOCKS:VNODE使用的内存块数。3.0版本使用BUFFER来表示VNODE写入内存池的大小。</li><li>UPDATE:更新操作的支持模式。3.0版本所有数据库都支持部分列更新。</li><li>CACHELAST:缓存最新一行数据的模式。3.0版本用CACHEMODEL代替。</li><li>COMP:3.0版本暂不支持修改。</li></ul><p>新增</p><ul><li>CACHEMODEL:表示是否在内存中缓存子表的最近数据。</li><li>CACHESIZE:表示缓存子表最近数据的内存大小。</li><li>WAL_FSYNC_PERIOD:代替原FSYNC参数。</li><li>WAL_LEVEL:代替原WAL参数。</li><li>WAL_RETENTION_PERIOD:3.0.4.0版本新增,wal文件的额外保留策略,用于数据订阅。</li><li>WAL_RETENTION_SIZE:3.0.4.0版本新增,wal文件的额外保留策略,用于数据订阅。</li></ul><p>调整</p><ul><li>REPLICA:3.0.0版本暂不支持修改。</li><li>KEEP:3.0版本新增支持带单位的设置方式。</li></ul>
| 3 | ALTER DATABASE | 调整 | <p>废除</p><ul><li>QUORUM:写入需要的副本确认数。3.0 版本默认行为是强一致性,且不支持修改为弱一致性。</li><li>BLOCKS:VNODE使用的内存块数。3.0版本使用BUFFER来表示VNODE写入内存池的大小。</li><li>UPDATE:更新操作的支持模式。3.0版本所有数据库都支持部分列更新。</li><li>CACHELAST:缓存最新一行数据的模式。3.0版本用CACHEMODEL代替。</li><li>COMP:3.0版本暂不支持修改。</li></ul><p>新增</p><ul><li>CACHEMODEL:表示是否在内存中缓存子表的最近数据。</li><li>CACHESIZE:表示缓存子表最近数据的内存大小。</li><li>WAL_FSYNC_PERIOD:代替原FSYNC参数。</li><li>WAL_LEVEL:代替原WAL参数。</li><li>WAL_RETENTION_PERIOD:3.0.4.0版本新增,wal文件的额外保留策略,用于数据订阅。</li><li>WAL_RETENTION_SIZE:3.0.4.0版本新增,wal文件的额外保留策略,用于数据订阅。</li></ul><p>调整</p><ul><li>KEEP:3.0版本新增支持带单位的设置方式。</li></ul>
| 4 | ALTER STABLE | 调整 | 废除<ul><li>CHANGE TAG:修改标签列的名称。3.0版本使用RENAME TAG代替。<br/>新增</li><li>RENAME TAG:代替原CHANGE TAG子句。</li><li>COMMENT:修改超级表的注释。</li></ul>
| 5 | ALTER TABLE | 调整 | 废除<ul><li>CHANGE TAG:修改标签列的名称。3.0版本使用RENAME TAG代替。<br/>新增</li><li>RENAME TAG:代替原CHANGE TAG子句。</li><li>COMMENT:修改表的注释。</li><li>TTL:修改表的生命周期。</li></ul>
| 6 | ALTER USER | 调整 | 废除<ul><li>PRIVILEGE:修改用户权限。3.0版本使用GRANT和REVOKE来授予和回收权限。<br/>新增</li><li>ENABLE:启用或停用此用户。</li><li>SYSINFO:修改用户是否可查看系统信息。</li></ul>
......
......@@ -225,7 +225,6 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_COMMIT, "vnode-commit", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_INDEX, "vnode-create-index", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_INDEX, "vnode-drop-index", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DISABLE_WRITE, "vnode-disable-write", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MAX_MSG, "vnd-max", NULL, NULL)
......
......@@ -2084,6 +2084,8 @@ int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb
return -1;
}
mndSortVnodeGid(&newVgroup);
{
SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
if (pVgRaw == NULL) return -1;
......@@ -2162,6 +2164,7 @@ static int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj
mInfo("vgId:%d, vnode:%d dnode:%d", newVg2.vgId, i, newVg2.vnodeGid[i].dnodeId);
}
// alter hash range
int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, &newVg1, maxVgId) != 0) goto _OVER;
newVg1.vgId = maxVgId;
......@@ -2170,25 +2173,30 @@ static int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj
if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, &newVg2, maxVgId) != 0) goto _OVER;
newVg2.vgId = maxVgId;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg2) != 0) goto _OVER;
// adjust vgroup replica
if (pDb->cfg.replications != newVg1.replica) {
if (mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg1, pArray) != 0) goto _OVER;
}
if (pDb->cfg.replications != newVg2.replica) {
if (mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg2, pArray) != 0) goto _OVER;
}
} else {
pRaw = mndVgroupActionEncode(&newVg1);
if (pRaw == NULL) goto _OVER;
if (mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER;
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
pRaw = NULL;
}
if (pDb->cfg.replications != newVg2.replica) {
if (mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg2, pArray) != 0) goto _OVER;
} else {
pRaw = mndVgroupActionEncode(&newVg2);
if (pRaw == NULL) goto _OVER;
if (mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER;
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
pRaw = NULL;
}
pRaw = mndVgroupActionEncode(pVgroup);
if (pRaw == NULL) goto _OVER;
......@@ -2245,7 +2253,12 @@ static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq) {
if (pDb == NULL) goto _OVER;
code = mndSplitVgroup(pMnode, pReq, pDb, pVgroup);
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
if (code != 0) {
mError("vgId:%d, failed to start to split vgroup since %s, db:%s", pVgroup->vgId, terrstr(), pDb->name);
goto _OVER;
}
mInfo("vgId:%d, split vgroup started successfully. db:%s", pVgroup->vgId, pDb->name);
_OVER:
mndReleaseVgroup(pMnode, pVgroup);
......
......@@ -343,6 +343,7 @@ struct SVnodeCfg {
SVnodeStats vndStats;
uint32_t hashBegin;
uint32_t hashEnd;
bool hashChange;
int16_t sttTrigger;
int16_t hashPrefix;
int16_t hashSuffix;
......
......@@ -134,6 +134,7 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
if (tjsonAddIntegerToObject(pJson, "sstTrigger", pCfg->sttTrigger) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "hashBegin", pCfg->hashBegin) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "hashEnd", pCfg->hashEnd) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "hashChange", pCfg->hashChange) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "hashMethod", pCfg->hashMethod) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "hashPrefix", pCfg->hashPrefix) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "hashSuffix", pCfg->hashSuffix) < 0) return -1;
......@@ -249,6 +250,8 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "hashEnd", pCfg->hashEnd, code);
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "hashChange", pCfg->hashChange, code);
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "hashMethod", pCfg->hashMethod, code);
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "hashPrefix", pCfg->hashPrefix, code);
......
......@@ -198,6 +198,7 @@ int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnod
info.config.vgId = pReq->dstVgId;
info.config.hashBegin = pReq->hashBegin;
info.config.hashEnd = pReq->hashEnd;
info.config.hashChange = true;
info.config.walCfg.vgId = pReq->dstVgId;
SSyncCfg *pCfg = &info.config.syncCfg;
......
......@@ -1453,11 +1453,30 @@ int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen) {
return vnodeProcessCreateTSmaReq(pVnode, 1, pCont, contLen, NULL);
}
static int32_t vnodeConsolidateAlterHashRange(SVnode *pVnode, int64_t version) {
int32_t code = TSDB_CODE_SUCCESS;
vInfo("vgId:%d, trim meta of tables per hash range [%" PRIu32 ", %" PRIu32 "]. apply-index:%" PRId64, TD_VID(pVnode),
pVnode->config.hashBegin, pVnode->config.hashEnd, version);
// TODO: trim meta of tables from TDB per hash range [pVnode->config.hashBegin, pVnode->config.hashEnd]
return code;
}
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
vInfo("vgId:%d, vnode management handle msgType:alter-confirm, alter replica confim msg is processed",
TD_VID(pVnode));
vInfo("vgId:%d, vnode handle msgType:alter-confirm, alter confim msg is processed", TD_VID(pVnode));
int32_t code = TSDB_CODE_SUCCESS;
if (!pVnode->config.hashChange) {
goto _exit;
}
code = vnodeConsolidateAlterHashRange(pVnode, version);
pVnode->config.hashChange = false;
_exit:
pRsp->msgType = TDMT_VND_ALTER_CONFIRM_RSP;
pRsp->code = TSDB_CODE_SUCCESS;
pRsp->code = code;
pRsp->pCont = NULL;
pRsp->contLen = 0;
......
......@@ -93,6 +93,19 @@ endi
print =============== step4: split
print split vgroup 2
sql split vgroup 2
$wt = 0
stepwt1:
$wt = $wt + 1
sleep 1000
if $wt == 200 then
print ====> split vgroup not completed!
return -1
endi
sql show transactions
if $rows != 0 then
print wait 1 seconds to alter
goto stepwt1
endi
print =============== step5: check split result
sql show d1.tables
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册