提交 72516345 编写于 作者: D dapan1121

Merge branch '3.0' into feature/qnode

......@@ -35,7 +35,7 @@ endif(${BUILD_TEST})
add_subdirectory(source)
add_subdirectory(tools)
add_subdirectory(tests)
add_subdirectory(example)
add_subdirectory(examples/c)
# docs
add_subdirectory(docs)
......
......@@ -46,7 +46,7 @@ ENDIF ()
IF (TD_WINDOWS)
MESSAGE("${Yellow} set compiler flag for Windows! ${ColourReset}")
SET(COMMON_FLAGS "/w /D_WIN32 /Zi")
SET(COMMON_FLAGS "/w /D_WIN32 /DWIN32 /Zi")
SET(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} /MANIFEST:NO")
# IF (MSVC AND (MSVC_VERSION GREATER_EQUAL 1900))
# SET(COMMON_FLAGS "${COMMON_FLAGS} /Wv:18")
......
---
sidebar_label: SQL 函数
title: SQL 函数
sidebar_label: 函数
title: 函数
toc_max_heading_level: 4
---
......@@ -20,7 +20,7 @@ toc_max_heading_level: 4
**返回结果类型**:如果输入值为整数,输出值是 UBIGINT 类型。如果输入值是 FLOAT/DOUBLE 数据类型,输出值是 DOUBLE 数据类型。
**适用数据类型**不能应用在 timestamp、binary、nchar、bool 类型字段上;在超级表查询中使用时,不能应用在 tag 列
**适用数据类型**数值类型。
**嵌套子查询支持**:适用于内层查询和外层查询。
......@@ -38,7 +38,7 @@ toc_max_heading_level: 4
**返回结果类型**:DOUBLE。如果输入值为 NULL,输出值也为 NULL
**适用数据类型**不能应用在 timestamp、binary、nchar、bool 类型字段上;在超级表查询中使用时,不能应用在 tag 列
**适用数据类型**数值类型。
**嵌套子查询支持**:适用于内层查询和外层查询。
......@@ -56,7 +56,7 @@ toc_max_heading_level: 4
**返回结果类型**:DOUBLE。如果输入值为 NULL,输出值也为 NULL
**适用数据类型**不能应用在 timestamp、binary、nchar、bool 类型字段上;在超级表查询中使用时,不能应用在 tag 列
**适用数据类型**数值类型。
**嵌套子查询支持**:适用于内层查询和外层查询。
......@@ -75,7 +75,7 @@ toc_max_heading_level: 4
**返回结果类型**:DOUBLE。如果输入值为 NULL,输出值也为 NULL
**适用数据类型**不能应用在 timestamp、binary、nchar、bool 类型字段上;在超级表查询中使用时,不能应用在 tag 列
**适用数据类型**数值类型。
**嵌套子查询支持**:适用于内层查询和外层查询。
......@@ -94,7 +94,7 @@ SELECT CEIL(field_name) FROM { tb_name | stb_name } [WHERE clause];
**返回结果类型**:与指定列的原始数据类型一致。例如,如果指定列的原始数据类型为 Float,那么返回的数据类型也为 Float;如果指定列的原始数据类型为 Double,那么返回的数据类型也为 Double。
**适用数据类型**不能应用在 timestamp、binary、nchar、bool 类型字段上;在超级表查询中使用时,不能应用在 tag 列,无论 tag 列的类型是什么类型。
**适用数据类型**数值类型。
**适用于**: 普通表、超级表。
......@@ -115,7 +115,7 @@ SELECT CEIL(field_name) FROM { tb_name | stb_name } [WHERE clause];
**返回结果类型**:DOUBLE。如果输入值为 NULL,输出值也为 NULL
**适用数据类型**不能应用在 timestamp、binary、nchar、bool 类型字段上;在超级表查询中使用时,不能应用在 tag 列
**适用数据类型**数值类型。
**嵌套子查询支持**:适用于内层查询和外层查询。
......@@ -142,7 +142,7 @@ SELECT FLOOR(field_name) FROM { tb_name | stb_name } [WHERE clause];
**返回结果类型**:DOUBLE。如果输入值为 NULL,输出值也为 NULL
**适用数据类型**不能应用在 timestamp、binary、nchar、bool 类型字段上;在超级表查询中使用时,不能应用在 tag 列
**适用数据类型**数值类型。
**嵌套子查询支持**:适用于内层查询和外层查询。
......@@ -161,7 +161,7 @@ SELECT FLOOR(field_name) FROM { tb_name | stb_name } [WHERE clause];
**返回结果类型**:DOUBLE。如果输入值为 NULL,输出值也为 NULL
**适用数据类型**不能应用在 timestamp、binary、nchar、bool 类型字段上;在超级表查询中使用时,不能应用在 tag 列
**适用数据类型**数值类型。
**嵌套子查询支持**:适用于内层查询和外层查询。
......@@ -190,7 +190,7 @@ SELECT ROUND(field_name) FROM { tb_name | stb_name } [WHERE clause];
**返回结果类型**:DOUBLE。如果输入值为 NULL,输出值也为 NULL
**适用数据类型**不能应用在 timestamp、binary、nchar、bool 类型字段上;在超级表查询中使用时,不能应用在 tag 列
**适用数据类型**数值类型。
**嵌套子查询支持**:适用于内层查询和外层查询。
......@@ -208,7 +208,7 @@ SELECT ROUND(field_name) FROM { tb_name | stb_name } [WHERE clause];
**返回结果类型**:DOUBLE。如果输入值为 NULL,输出值也为 NULL
**适用数据类型**不能应用在 timestamp、binary、nchar、bool 类型字段上;在超级表查询中使用时,不能应用在 tag 列
**适用数据类型**数值类型。
**嵌套子查询支持**:适用于内层查询和外层查询。
......@@ -226,7 +226,7 @@ SELECT ROUND(field_name) FROM { tb_name | stb_name } [WHERE clause];
**返回结果类型**:DOUBLE。如果输入值为 NULL,输出值也为 NULL
**适用数据类型**不能应用在 timestamp、binary、nchar、bool 类型字段上;在超级表查询中使用时,不能应用在 tag 列
**适用数据类型**数值类型。
**嵌套子查询支持**:适用于内层查询和外层查询。
......@@ -248,7 +248,7 @@ SELECT ROUND(field_name) FROM { tb_name | stb_name } [WHERE clause];
**返回结果类型**:INT。如果输入值为NULL,输出值为NULL。
**适用数据类型**输入参数是 BINARY 类型或者 NCHAR 类型的字符串或者列。不能应用在 TAG 列。
**适用数据类型**VARCHAR, NCHAR
**嵌套子查询支持**:适用于内层查询和外层查询。
......@@ -262,9 +262,9 @@ SELECT ROUND(field_name) FROM { tb_name | stb_name } [WHERE clause];
**功能说明**:字符串连接函数。
**返回结果类型**:如果所有参数均为BINARY类型,则结果类型为BINARY。如果参数包含NCHAR类型,则结果类型为NCHAR。如果输入值为NULL,输出值为NULL。
**返回结果类型**:如果所有参数均为 VARCHAR 类型,则结果类型为 VARCHAR。如果参数包含NCHAR类型,则结果类型为NCHAR。如果输入值为NULL,输出值为NULL。
**适用数据类型**BINARY, NCHAR。不能应用在 TAG 列。 该函数最小参数个数为2个,最大参数个数为8个。
**适用数据类型**VARCHAR, NCHAR。 该函数最小参数个数为2个,最大参数个数为8个。
**嵌套子查询支持**:适用于内层查询和外层查询。
......@@ -279,9 +279,9 @@ SELECT ROUND(field_name) FROM { tb_name | stb_name } [WHERE clause];
**功能说明**:带分隔符的字符串连接函数。
**返回结果类型**:如果所有参数均为BINARY类型,则结果类型为BINARY。如果参数包含NCHAR类型,则结果类型为NCHAR。如果输入值为NULL,输出值为NULL。如果separator值不为NULL,其他输入为NULL,输出为空串。
**返回结果类型**:如果所有参数均为VARCHAR类型,则结果类型为VARCHAR。如果参数包含NCHAR类型,则结果类型为NCHAR。如果输入值为NULL,输出值为NULL。如果separator值不为NULL,其他输入为NULL,输出为空串。
**适用数据类型**BINARY, NCHAR。不能应用在 TAG 列。 该函数最小参数个数为3个,最大参数个数为9个。
**适用数据类型**VARCHAR, NCHAR。 该函数最小参数个数为3个,最大参数个数为9个。
**嵌套子查询支持**:适用于内层查询和外层查询。
......@@ -298,7 +298,7 @@ SELECT ROUND(field_name) FROM { tb_name | stb_name } [WHERE clause];
**返回结果类型**:INT。
**适用数据类型**:输入参数是 BINARY 类型或者 NCHAR 类型的字符串或者列。不能应用在 TAG 列。
**适用数据类型**:输入参数是 VARCHAR 类型或者 NCHAR 类型的字符串或者列。
**嵌套子查询支持**:适用于内层查询和外层查询。
......@@ -315,7 +315,7 @@ SELECT ROUND(field_name) FROM { tb_name | stb_name } [WHERE clause];
**返回结果类型**:同输入类型。如果输入值为NULL,输出值为NULL。
**适用数据类型**:输入参数是 BINARY 类型或者 NCHAR 类型的字符串或者列。不能应用在 TAG 列。
**适用数据类型**:输入参数是 VARCHAR 类型或者 NCHAR 类型的字符串或者列。
**嵌套子查询支持**:适用于内层查询和外层查询。
......@@ -332,7 +332,7 @@ SELECT ROUND(field_name) FROM { tb_name | stb_name } [WHERE clause];
**返回结果类型**:同输入类型。如果输入值为NULL,输出值为NULL。
**适用数据类型**:输入参数是 BINARY 类型或者 NCHAR 类型的字符串或者列。不能应用在 TAG 列。
**适用数据类型**:输入参数是 VARCHAR 类型或者 NCHAR 类型的字符串或者列。
**嵌套子查询支持**:适用于内层查询和外层查询。
......@@ -349,7 +349,7 @@ SELECT ROUND(field_name) FROM { tb_name | stb_name } [WHERE clause];
**返回结果类型**:同输入类型。如果输入值为NULL,输出值为NULL。
**适用数据类型**:输入参数是 BINARY 类型或者 NCHAR 类型的字符串或者列。不能应用在 TAG 列。
**适用数据类型**:输入参数是 VARCHAR 类型或者 NCHAR 类型的字符串或者列。
**嵌套子查询支持**:适用于内层查询和外层查询。
......@@ -366,7 +366,7 @@ SELECT ROUND(field_name) FROM { tb_name | stb_name } [WHERE clause];
**返回结果类型**:同输入类型。如果输入值为NULL,输出值为NULL。
**适用数据类型**:输入参数是 BINARY 类型或者 NCHAR 类型的字符串或者列。不能应用在 TAG 列。输入参数pos可以为正数,也可以为负数。如果pos是正数,表示开始位置从字符串开头正数计算。如果pos为负数,表示开始位置从字符串结尾倒数计算。如果输入参数len被忽略,返回的子串包含从pos开始的整个字串。
**适用数据类型**:输入参数是 VARCHAR 类型或者 NCHAR 类型的字符串或者列。输入参数pos可以为正数,也可以为负数。如果pos是正数,表示开始位置从字符串开头正数计算。如果pos为负数,表示开始位置从字符串结尾倒数计算。如果输入参数len被忽略,返回的子串包含从pos开始的整个字串。
**嵌套子查询支持**:适用于内层查询和外层查询。
......@@ -383,7 +383,7 @@ SELECT ROUND(field_name) FROM { tb_name | stb_name } [WHERE clause];
**返回结果类型**:同输入类型。如果输入值为NULL,输出值为NULL。
**适用数据类型**:输入参数是 BINARY 类型或者 NCHAR 类型的字符串或者列。不能应用在 TAG 列。
**适用数据类型**:输入参数是 VARCHAR 类型或者 NCHAR 类型的字符串或者列。
**嵌套子查询支持**:适用于内层查询和外层查询。
......@@ -400,7 +400,7 @@ SELECT ROUND(field_name) FROM { tb_name | stb_name } [WHERE clause];
SELECT CAST(expression AS type_name) FROM { tb_name | stb_name } [WHERE clause]
```
**功能说明**:数据类型转换函数,输入参数 expression 支持普通列、常量、标量函数及它们之间的四则运算,不支持 tag 列,只适用于 select 子句中。
**功能说明**:数据类型转换函数,输入参数 expression 支持普通列、常量、标量函数及它们之间的四则运算,只适用于 select 子句中。
**返回结果类型**:CAST 中指定的类型(type_name),可以是 BIGINT、BIGINT UNSIGNED、BINARY、VARCHAR、NCHAR和TIMESTAMP。
......@@ -423,7 +423,7 @@ SELECT TO_ISO8601(ts_val | ts_col) FROM { tb_name | stb_name } [WHERE clause];
**功能说明**:将 UNIX 时间戳转换成为 ISO8601 标准的日期时间格式,并附加客户端时区信息。
**返回结果数据类型**BINARY 类型。
**返回结果数据类型**VARCHAR 类型。
**适用数据类型**:UNIX 时间戳常量或是 TIMESTAMP 类型的列
......@@ -462,7 +462,7 @@ SELECT TO_UNIXTIMESTAMP(datetime_string | ts_col) FROM { tb_name | stb_name } [W
**返回结果数据类型**:长整型 INT64。
**应用字段**:字符串常量或是 BINARY/NCHAR 类型的列。
**应用字段**:字符串常量或是 VARCHAR/NCHAR 类型的列。
**适用于**:表、超级表。
......@@ -549,7 +549,7 @@ SELECT TIMEZONE() FROM { tb_name | stb_name } [WHERE clause];
**功能说明**:返回客户端当前时区信息。
**返回结果数据类型**BINARY 类型。
**返回结果数据类型**VARCHAR 类型。
**应用字段**:无
......@@ -668,7 +668,7 @@ SELECT LEASTSQUARES(field_name, start_val, step_val) FROM tb_name [WHERE clause]
SELECT MODE(field_name) FROM tb_name [WHERE clause];
```
**功能说明**:返回出现频率最高的值,若存在多个频率相同的最高值,输出空。不能匹配标签、时间戳输出。
**功能说明**:返回出现频率最高的值,若存在多个频率相同的最高值,输出空。
**返回数据类型**:同应用的字段。
......@@ -808,6 +808,25 @@ SELECT BOTTOM(field_name, K) FROM { tb_name | stb_name } [WHERE clause];
- 系统同时返回该记录关联的时间戳列;
- 限制:BOTTOM 函数不支持 FILL 子句。
### FIRST
```
SELECT FIRST(field_name) FROM { tb_name | stb_name } [WHERE clause];
```
**功能说明**:统计表/超级表中某列的值最先写入的非 NULL 值。
**返回数据类型**:同应用的字段。
**适用数据类型**:所有字段。
**适用于**:表和超级表。
**使用说明**:
- 如果要返回各个列的首个(时间戳最小)非 NULL 值,可以使用 FIRST(\*);
- 如果结果集中的某列全部为 NULL 值,则该列的返回结果也是 NULL;
- 如果结果集中所有列全部为 NULL 值,则不返回结果。
### INTERP
......@@ -919,25 +938,6 @@ SELECT PERCENTILE(field_name, P) FROM { tb_name } [WHERE clause];
**使用说明***P*值取值范围 0≤*P*≤100,为 0 的时候等同于 MIN,为 100 的时候等同于 MAX。
### FIRST
```
SELECT FIRST(field_name) FROM { tb_name | stb_name } [WHERE clause];
```
**功能说明**:统计表/超级表中某列的值最先写入的非 NULL 值。
**返回数据类型**:同应用的字段。
**适用数据类型**:所有字段。
**适用于**:表和超级表。
**使用说明**:
- 如果要返回各个列的首个(时间戳最小)非 NULL 值,可以使用 FIRST(\*);
- 如果结果集中的某列全部为 NULL 值,则该列的返回结果也是 NULL;
- 如果结果集中所有列全部为 NULL 值,则不返回结果。
### TAIL
......@@ -1005,7 +1005,7 @@ SELECT UNIQUE(field_name) FROM {tb_name | stb_name} [WHERE clause];
**返回结果类型**: 输入列如果是整数类型返回值为长整型 (int64_t),浮点数返回值为双精度浮点数(Double)。无符号整数类型返回值为无符号长整型(uint64_t)。 返回结果中同时带有每行记录对应的时间戳。
**适用数据类型**不能应用在 timestamp、binary、nchar、bool 类型字段上;在超级表查询中使用时,不能应用在标签之上
**适用数据类型**数值类型
**嵌套子查询支持**: 适用于内层查询和外层查询。
......@@ -1076,7 +1076,7 @@ SELECT IRATE(field_name) FROM tb_name WHERE clause;
**返回结果类型**: 返回双精度浮点数类型。
**适用数据类型**不能应用在 timestamp、binary、nchar、bool 类型上;在超级表查询中使用时,不能应用在标签之上
**适用数据类型**数值类型
**嵌套子查询支持**: 适用于内层查询和外层查询。
......@@ -1124,7 +1124,7 @@ SELECT STATECOUNT(field_name, oper, val) FROM { tb_name | stb_name } [WHERE clau
**返回结果类型**:整形。
**适用数据类型**不能应用在 timestamp、binary、nchar、bool 类型字段上
**适用数据类型**数值类型
**嵌套子查询支持**:不支持应用在子查询上。
......@@ -1152,7 +1152,7 @@ SELECT stateDuration(field_name, oper, val, unit) FROM { tb_name | stb_name } [W
**返回结果类型**:整形。
**适用数据类型**不能应用在 timestamp、binary、nchar、bool 类型字段上
**适用数据类型**数值类型
**嵌套子查询支持**:不支持应用在子查询上。
......
......@@ -35,8 +35,8 @@ TDengine 支持 `UNION ALL` 和 `UNION` 操作符。UNION ALL 将查询返回的
| --- | :---------------: | -------------------------------------------------------------------- | -------------------- |
| 1 | = | 除 BLOB、MEDIUMBLOB 和 JSON 外的所有类型 | 相等 |
| 2 | <\>, != | 除 BLOB、MEDIUMBLOB 和 JSON 外的所有类型,且不可以为表的时间戳主键列 | 不相等 |
| 3 | \>, \< | 除 BLOB、MEDIUMBLOB 和 JSON 外的所有类型 | 大于,小于 |
| 4 | \>=, \<= | 除 BLOB、MEDIUMBLOB 和 JSON 外的所有类型 | 大于等于,小于等于 |
| 3 | \>, < | 除 BLOB、MEDIUMBLOB 和 JSON 外的所有类型 | 大于,小于 |
| 4 | \>=, <= | 除 BLOB、MEDIUMBLOB 和 JSON 外的所有类型 | 大于等于,小于等于 |
| 5 | IS [NOT] NULL | 所有类型 | 是否为空值 |
| 6 | [NOT] BETWEEN AND | 除 BOOL、BLOB、MEDIUMBLOB 和 JSON 外的所有类型 | 闭区间比较 |
| 7 | IN | 除 BLOB、MEDIUMBLOB 和 JSON 外的所有类型,且不可以为表的时间戳主键列 | 与列表内的任意值相等 |
......
......@@ -114,7 +114,6 @@ TDengine 客户端驱动的安装请参考 [安装指南](/reference/connector#
<summary>订阅和消费</summary>
```c
{{#include examples/c/subscribe.c}}
```
</details>
......
......@@ -18,21 +18,22 @@ TDengine 能够与开源数据可视化系统 [Grafana](https://www.grafana.com/
## 配置 Grafana
TDengine 的 Grafana 插件托管在 GitHub,可从 <https://github.com/taosdata/grafanaplugin/releases/latest> 下载,当前最新版本为 3.1.4。
推荐使用 [`grafana-cli` 命令行工具](https://grafana.com/docs/grafana/latest/administration/cli/) 进行插件安装。
使用 [`grafana-cli` 命令行工具](https://grafana.com/docs/grafana/latest/administration/cli/) 进行插件[安装](https://grafana.com/grafana/plugins/tdengine-datasource/?tab=installation)。
```bash
sudo -u grafana grafana-cli \
--pluginUrl https://github.com/taosdata/grafanaplugin/releases/download/v3.1.7/tdengine-datasource-3.1.7.zip \
plugins install tdengine-datasource
grafana-cli plugins install tdengine-datasource
# with sudo
sudo -u grafana grafana-cli plugins install tdengine-datasource
```
或者下载到本地并解压到 Grafana 插件目录。
或者从 [GitHub](https://github.com/taosdata/grafanaplugin/releases/tag/latest) 或 [Grafana](https://grafana.com/grafana/plugins/tdengine-datasource/?tab=installation) 下载 .zip 文件到本地并解压到 Grafana 插件目录。命令行下载示例如下:
```bash
GF_VERSION=3.1.7
GF_VERSION=3.2.2
# from GitHub
wget https://github.com/taosdata/grafanaplugin/releases/download/v$GF_VERSION/tdengine-datasource-$GF_VERSION.zip
# from Grafana
wget -O tdengine-datasource-$GF_VERSION.zip https://grafana.com/api/plugins/tdengine-datasource/versions/$GF_VERSION/download
```
以 CentOS 7.2 操作系统为例,将插件包解压到 /var/lib/grafana/plugins 目录下,重新启动 grafana 即可。
......@@ -41,28 +42,17 @@ wget https://github.com/taosdata/grafanaplugin/releases/download/v$GF_VERSION/td
sudo unzip tdengine-datasource-$GF_VERSION.zip -d /var/lib/grafana/plugins/
```
:::note
3.1.6 和更早版本未签名,会在 Grafana 7.3+ / 8.x 版本签名检查时失败导致无法加载插件,需要在 grafana.ini 文件中修改配置如下:
```ini
[plugins]
allow_loading_unsigned_plugins = tdengine-datasource
```
:::
在 Docker 环境下,可以使用如下的环境变量设置自动安装并设置 TDengine 插件:
如果 Grafana 在 Docker 环境下运行,可以使用如下的环境变量设置自动安装 TDengine 数据源插件:
```bash
GF_INSTALL_PLUGINS=https://github.com/taosdata/grafanaplugin/releases/download/v3.1.4/tdengine-datasource-3.1.4.zip;tdengine-datasource
GF_PLUGINS_ALLOW_LOADING_UNSIGNED_PLUGINS=tdengine-datasource
GF_INSTALL_PLUGINS=tdengine-datasource
```
## 使用 Grafana
### 配置数据源
用户可以直接通过 http://localhost:3000 的网址,登录 Grafana 服务器(用户名/密码:admin/admin),通过左侧 `Configuration -> Data Sources` 可以添加数据源,如下图所示:
用户可以直接通过 <http://localhost:3000> 的网址,登录 Grafana 服务器(用户名/密码:admin/admin),通过左侧 `Configuration -> Data Sources` 可以添加数据源,如下图所示:
![TDengine Database Grafana plugin add data source](./add_datasource1.webp)
......@@ -74,7 +64,7 @@ GF_PLUGINS_ALLOW_LOADING_UNSIGNED_PLUGINS=tdengine-datasource
![TDengine Database Grafana plugin add data source](./add_datasource3.webp)
- Host: TDengine 集群中提供 REST 服务 (在 2.4 之前由 taosd 提供, 从 2.4 开始由 taosAdapter 提供)的组件所在服务器的 IP 地址与 TDengine REST 服务的端口号(6041),默认 http://localhost:6041
- Host: TDengine 集群中提供 REST 服务 (在 2.4 之前由 taosd 提供, 从 2.4 开始由 taosAdapter 提供)的组件所在服务器的 IP 地址与 TDengine REST 服务的端口号(6041),默认 <http://localhost:6041>
- User:TDengine 用户名。
- Password:TDengine 用户密码。
......
此差异已折叠。
......@@ -35,8 +35,8 @@ TDengine provides 2 set operators: `UNION ALL` and `UNION`. `UNION ALL` combines
| --- | :---------------: | ------------------------------------------------------------------- | ----------------------------------------------- |
| 1 | = | Except for BLOB, MEDIUMBLOB and JSON | Equal |
| 2 | <\>, != | Except for BLOB, MEDIUMBLOB, JSON and primary key of timestamp type | Not equal |
| 3 | \>, \< | Except for BLOB, MEDIUMBLOB and JSON | Greater than, less than |
| 4 | \>=, \<= | Except for BLOB, MEDIUMBLOB and JSON | Greater than or equal to, less than or equal to |
| 3 | \>, < | Except for BLOB, MEDIUMBLOB and JSON | Greater than, less than |
| 4 | \>=, <= | Except for BLOB, MEDIUMBLOB and JSON | Greater than or equal to, less than or equal to |
| 5 | IS [NOT] NULL | Any types | Is NULL or NOT |
| 6 | [NOT] BETWEEN AND | Except for BLOB, MEDIUMBLOB and JSON | In a value range or not |
| 7 | IN | Except for BLOB, MEDIUMBLOB, JSON and primary key of timestamp type | In a list of values or not |
......
......@@ -114,7 +114,6 @@ This section shows sample code for standard access methods to TDengine clusters
<summary>Subscribe and consume</summary>
```c
{{#include examples/c/subscribe.c}}
```
</details>
......
......@@ -9,7 +9,8 @@ You can learn more about using the TDengine plugin on [GitHub](https://github.co
## Prerequisites
In order for Grafana to add the TDengine data source successfully, the following preparation is required:
In order for Grafana to add the TDengine data source successfully, the following preparations are required:
1. The TDengine cluster is deployed and functioning properly
2. taosAdapter is installed and running properly. Please refer to the taosAdapter manual for details.
......@@ -19,41 +20,34 @@ TDengine currently supports Grafana versions 7.0 and above. Users can go to the
## Configuring Grafana
You can download The Grafana plugin for TDengine from <https://github.com/taosdata/grafanaplugin/releases/latest>. The current latest version is 3.1.4.
Recommend using the [``grafana-cli`` command-line tool](https://grafana.com/docs/grafana/latest/administration/cli/) for plugin installation.
Follow the installation steps in [Grafana](https://grafana.com/grafana/plugins/tdengine-datasource/?tab=installation) with the [``grafana-cli`` command-line tool](https://grafana.com/docs/grafana/latest/administration/cli/) for plugin installation.
```bash
sudo -u grafana grafana-cli \
--pluginUrl https://github.com/taosdata/grafanaplugin/releases/download/v3.1.4/tdengine-datasource-3.1.4.zip \
plugins install tdengine-datasource
grafana-cli plugins install tdengine-datasource
# with sudo
sudo -u grafana grafana-cli plugins install tdengine-datasource
```
Or download it locally and extract it to the Grafana plugin directory.
Alternatively, you can manually download the .zip file from [GitHub](https://github.com/taosdata/grafanaplugin/releases/tag/latest) or [Grafana](https://grafana.com/grafana/plugins/tdengine-datasource/?tab=installation) and unpack it into your grafana plugins directory.
```bash
GF_VERSION=3.1.4
GF_VERSION=3.2.2
# from GitHub
wget https://github.com/taosdata/grafanaplugin/releases/download/v$GF_VERSION/tdengine-datasource-$GF_VERSION.zip
# from Grafana
wget -O tdengine-datasource-$GF_VERSION.zip https://grafana.com/api/plugins/tdengine-datasource/versions/$GF_VERSION/download
```
In CentOS 7.2 for example, extract the plugin package to /var/lib/grafana/plugins directory, and restart grafana.
Take CentOS 7.2 for example, extract the plugin package to /var/lib/grafana/plugins directory, and restart grafana.
```bash
sudo unzip tdengine-datasource-$GF_VERSION.zip -d /var/lib/grafana/plugins/
```
Grafana versions 7.3+ / 8.x do signature checks on plugins, so you also need to add the following line to the grafana.ini file to use the plugin correctly.
```ini
[plugins]
allow_loading_unsigned_plugins = tdengine-datasource
```
The TDengine plugin can be automatically installed and set up using the following environment variable settings in a Docker environment.
If Grafana is running in a Docker environment, the TDengine plugin can be automatically installed and set up using the following environment variable settings:
```bash
GF_INSTALL_PLUGINS=https://github.com/taosdata/grafanaplugin/releases/download/v3.1.4/tdengine-datasource-3.1.4.zip;tdengine- datasource
GF_PLUGINS_ALLOW_LOADING_UNSIGNED_PLUGINS=tdengine-datasource
GF_INSTALL_PLUGINS=tdengine-datasource
```
## Using Grafana
......
add_executable(tmq "")
add_executable(tstream "")
add_executable(demoapi "")
target_sources(tmq
PRIVATE
"src/tmq.c"
)
target_sources(tstream
PRIVATE
"src/tstream.c"
)
target_sources(demoapi
PRIVATE
"src/demoapi.c"
)
target_link_libraries(tmq
taos_static
)
target_link_libraries(tstream
taos_static
)
target_link_libraries(demoapi
taos_static
)
target_include_directories(tmq
PUBLIC "${TD_SOURCE_DIR}/include/os"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_include_directories(tstream
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_include_directories(demoapi
PUBLIC "${TD_SOURCE_DIR}/include/client"
PUBLIC "${TD_SOURCE_DIR}/include/os"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
SET_TARGET_PROPERTIES(tmq PROPERTIES OUTPUT_NAME tmq)
SET_TARGET_PROPERTIES(tstream PROPERTIES OUTPUT_NAME tstream)
SET_TARGET_PROPERTIES(demoapi PROPERTIES OUTPUT_NAME demoapi)
......@@ -3,20 +3,70 @@ PROJECT(TDengine)
IF (TD_LINUX)
INCLUDE_DIRECTORIES(. ${TD_SOURCE_DIR}/src/inc ${TD_SOURCE_DIR}/src/client/inc ${TD_SOURCE_DIR}/inc)
AUX_SOURCE_DIRECTORY(. SRC)
ADD_EXECUTABLE(demo apitest.c)
TARGET_LINK_LIBRARIES(demo taos_static trpc tutil pthread )
ADD_EXECUTABLE(sml schemaless.c)
TARGET_LINK_LIBRARIES(sml taos_static trpc tutil pthread )
ADD_EXECUTABLE(subscribe subscribe.c)
TARGET_LINK_LIBRARIES(subscribe taos_static trpc tutil pthread )
ADD_EXECUTABLE(epoll epoll.c)
TARGET_LINK_LIBRARIES(epoll taos_static trpc tutil pthread lua)
# ADD_EXECUTABLE(demo apitest.c)
#TARGET_LINK_LIBRARIES(demo taos_static trpc tutil pthread )
#ADD_EXECUTABLE(sml schemaless.c)
#TARGET_LINK_LIBRARIES(sml taos_static trpc tutil pthread )
#ADD_EXECUTABLE(subscribe subscribe.c)
#TARGET_LINK_LIBRARIES(subscribe taos_static trpc tutil pthread )
#ADD_EXECUTABLE(epoll epoll.c)
#TARGET_LINK_LIBRARIES(epoll taos_static trpc tutil pthread lua)
add_executable(tmq "")
add_executable(stream_demo "")
add_executable(demoapi "")
target_sources(tmq
PRIVATE
"tmq.c"
)
target_sources(stream_demo
PRIVATE
"stream_demo.c"
)
target_sources(demoapi
PRIVATE
"demoapi.c"
)
target_link_libraries(tmq
taos_static
)
target_link_libraries(stream_demo
taos_static
)
target_link_libraries(demoapi
taos_static
)
target_include_directories(tmq
PUBLIC "${TD_SOURCE_DIR}/include/os"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_include_directories(stream_demo
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_include_directories(demoapi
PUBLIC "${TD_SOURCE_DIR}/include/client"
PUBLIC "${TD_SOURCE_DIR}/include/os"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
SET_TARGET_PROPERTIES(tmq PROPERTIES OUTPUT_NAME tmq)
SET_TARGET_PROPERTIES(stream_demo PROPERTIES OUTPUT_NAME stream_demo)
SET_TARGET_PROPERTIES(demoapi PROPERTIES OUTPUT_NAME demoapi)
ENDIF ()
IF (TD_DARWIN)
INCLUDE_DIRECTORIES(. ${TD_SOURCE_DIR}/src/inc ${TD_SOURCE_DIR}/src/client/inc ${TD_SOURCE_DIR}/inc)
AUX_SOURCE_DIRECTORY(. SRC)
ADD_EXECUTABLE(demo demo.c)
TARGET_LINK_LIBRARIES(demo taos_static trpc tutil pthread lua)
ADD_EXECUTABLE(epoll epoll.c)
TARGET_LINK_LIBRARIES(epoll taos_static trpc tutil pthread lua)
#ADD_EXECUTABLE(demo demo.c)
#TARGET_LINK_LIBRARIES(demo taos_static trpc tutil pthread lua)
#ADD_EXECUTABLE(epoll epoll.c)
#TARGET_LINK_LIBRARIES(epoll taos_static trpc tutil pthread lua)
ENDIF ()
// sample code for TDengine subscribe/consume API
// to compile: gcc -o subscribe subscribe.c -ltaos
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "../../../include/client/taos.h" // include TDengine header file
int nTotalRows;
void print_result(TAOS_RES* res, int blockFetch) {
TAOS_ROW row = NULL;
int num_fields = taos_num_fields(res);
TAOS_FIELD* fields = taos_fetch_fields(res);
int nRows = 0;
if (blockFetch) {
nRows = taos_fetch_block(res, &row);
//for (int i = 0; i < nRows; i++) {
// taos_print_row(buf, row + i, fields, num_fields);
// puts(buf);
//}
} else {
while ((row = taos_fetch_row(res))) {
char buf[4096] = {0};
taos_print_row(buf, row, fields, num_fields);
puts(buf);
nRows++;
}
}
nTotalRows += nRows;
printf("%d rows consumed.\n", nRows);
}
void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) {
print_result(res, *(int*)param);
}
void check_row_count(int line, TAOS_RES* res, int expected) {
int actual = 0;
TAOS_ROW row;
while ((row = taos_fetch_row(res))) {
actual++;
}
if (actual != expected) {
printf("line %d: row count mismatch, expected: %d, actual: %d\n", line, expected, actual);
} else {
printf("line %d: %d rows consumed as expected\n", line, actual);
}
}
void do_query(TAOS* taos, const char* sql) {
TAOS_RES* res = taos_query(taos, sql);
taos_free_result(res);
}
void run_test(TAOS* taos) {
do_query(taos, "drop database if exists test;");
usleep(100000);
do_query(taos, "create database test;");
usleep(100000);
do_query(taos, "use test;");
usleep(100000);
do_query(taos, "create table meters(ts timestamp, a int) tags(area int);");
do_query(taos, "create table t0 using meters tags(0);");
do_query(taos, "create table t1 using meters tags(1);");
do_query(taos, "create table t2 using meters tags(2);");
do_query(taos, "create table t3 using meters tags(3);");
do_query(taos, "create table t4 using meters tags(4);");
do_query(taos, "create table t5 using meters tags(5);");
do_query(taos, "create table t6 using meters tags(6);");
do_query(taos, "create table t7 using meters tags(7);");
do_query(taos, "create table t8 using meters tags(8);");
do_query(taos, "create table t9 using meters tags(9);");
do_query(taos, "insert into t0 values('2020-01-01 00:00:00.000', 0);");
do_query(taos, "insert into t0 values('2020-01-01 00:01:00.000', 0);");
do_query(taos, "insert into t0 values('2020-01-01 00:02:00.000', 0);");
do_query(taos, "insert into t1 values('2020-01-01 00:00:00.000', 0);");
do_query(taos, "insert into t1 values('2020-01-01 00:01:00.000', 0);");
do_query(taos, "insert into t1 values('2020-01-01 00:02:00.000', 0);");
do_query(taos, "insert into t1 values('2020-01-01 00:03:00.000', 0);");
do_query(taos, "insert into t2 values('2020-01-01 00:00:00.000', 0);");
do_query(taos, "insert into t2 values('2020-01-01 00:01:00.000', 0);");
do_query(taos, "insert into t2 values('2020-01-01 00:01:01.000', 0);");
do_query(taos, "insert into t2 values('2020-01-01 00:01:02.000', 0);");
do_query(taos, "insert into t3 values('2020-01-01 00:01:02.000', 0);");
do_query(taos, "insert into t4 values('2020-01-01 00:01:02.000', 0);");
do_query(taos, "insert into t5 values('2020-01-01 00:01:02.000', 0);");
do_query(taos, "insert into t6 values('2020-01-01 00:01:02.000', 0);");
do_query(taos, "insert into t7 values('2020-01-01 00:01:02.000', 0);");
do_query(taos, "insert into t8 values('2020-01-01 00:01:02.000', 0);");
do_query(taos, "insert into t9 values('2020-01-01 00:01:02.000', 0);");
// super tables subscription
usleep(1000000);
TAOS_SUB* tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
TAOS_RES* res = taos_consume(tsub);
check_row_count(__LINE__, res, 18);
res = taos_consume(tsub);
check_row_count(__LINE__, res, 0);
do_query(taos, "insert into t0 values('2020-01-01 00:02:00.001', 0);");
do_query(taos, "insert into t8 values('2020-01-01 00:01:03.000', 0);");
res = taos_consume(tsub);
check_row_count(__LINE__, res, 2);
do_query(taos, "insert into t2 values('2020-01-01 00:01:02.001', 0);");
do_query(taos, "insert into t1 values('2020-01-01 00:03:00.001', 0);");
res = taos_consume(tsub);
check_row_count(__LINE__, res, 2);
do_query(taos, "insert into t1 values('2020-01-01 00:03:00.002', 0);");
res = taos_consume(tsub);
check_row_count(__LINE__, res, 1);
// keep progress information and restart subscription
taos_unsubscribe(tsub, 1);
do_query(taos, "insert into t0 values('2020-01-01 00:04:00.000', 0);");
tsub = taos_subscribe(taos, 1, "test", "select * from meters;", NULL, NULL, 0);
res = taos_consume(tsub);
check_row_count(__LINE__, res, 24);
// keep progress information and continue previous subscription
taos_unsubscribe(tsub, 1);
tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
res = taos_consume(tsub);
check_row_count(__LINE__, res, 0);
// don't keep progress information and continue previous subscription
taos_unsubscribe(tsub, 0);
tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
res = taos_consume(tsub);
check_row_count(__LINE__, res, 24);
// single meter subscription
taos_unsubscribe(tsub, 0);
tsub = taos_subscribe(taos, 0, "test", "select * from t0;", NULL, NULL, 0);
res = taos_consume(tsub);
check_row_count(__LINE__, res, 5);
res = taos_consume(tsub);
check_row_count(__LINE__, res, 0);
do_query(taos, "insert into t0 values('2020-01-01 00:04:00.001', 0);");
res = taos_consume(tsub);
check_row_count(__LINE__, res, 1);
taos_unsubscribe(tsub, 0);
}
int main(int argc, char *argv[]) {
const char* host = "127.0.0.1";
const char* user = "root";
const char* passwd = "taosdata";
const char* sql = "select * from meters;";
const char* topic = "test-multiple";
int async = 1, restart = 0, keep = 1, test = 0, blockFetch = 0;
for (int i = 1; i < argc; i++) {
if (strncmp(argv[i], "-h=", 3) == 0) {
host = argv[i] + 3;
continue;
}
if (strncmp(argv[i], "-u=", 3) == 0) {
user = argv[i] + 3;
continue;
}
if (strncmp(argv[i], "-p=", 3) == 0) {
passwd = argv[i] + 3;
continue;
}
if (strcmp(argv[i], "-sync") == 0) {
async = 0;
continue;
}
if (strcmp(argv[i], "-restart") == 0) {
restart = 1;
continue;
}
if (strcmp(argv[i], "-single") == 0) {
sql = "select * from t0;";
topic = "test-single";
continue;
}
if (strcmp(argv[i], "-nokeep") == 0) {
keep = 0;
continue;
}
if (strncmp(argv[i], "-sql=", 5) == 0) {
sql = argv[i] + 5;
topic = "test-custom";
continue;
}
if (strcmp(argv[i], "-test") == 0) {
test = 1;
continue;
}
if (strcmp(argv[i], "-block-fetch") == 0) {
blockFetch = 1;
continue;
}
}
TAOS* taos = taos_connect(host, user, passwd, "", 0);
if (taos == NULL) {
printf("failed to connect to db, reason:%s\n", taos_errstr(taos));
exit(1);
}
if (test) {
run_test(taos);
taos_close(taos);
exit(0);
}
taos_select_db(taos, "test");
TAOS_SUB* tsub = NULL;
if (async) {
// create an asynchronized subscription, the callback function will be called every 1s
tsub = taos_subscribe(taos, restart, topic, sql, subscribe_callback, &blockFetch, 1000);
} else {
// create an synchronized subscription, need to call 'taos_consume' manually
tsub = taos_subscribe(taos, restart, topic, sql, NULL, NULL, 0);
}
if (tsub == NULL) {
printf("failed to create subscription.\n");
exit(0);
}
if (async) {
getchar();
} else while(1) {
TAOS_RES* res = taos_consume(tsub);
if (res == NULL) {
printf("failed to consume data.");
break;
} else {
print_result(res, blockFetch);
getchar();
}
}
printf("total rows consumed: %d\n", nTotalRows);
taos_unsubscribe(tsub, keep);
taos_close(taos);
return 0;
}
......@@ -24,6 +24,7 @@ static void msg_process(TAOS_RES* msg) {
char buf[1024];
/*memset(buf, 0, 1024);*/
printf("topic: %s\n", tmq_get_topic_name(msg));
printf("db: %s\n", tmq_get_db_name(msg));
printf("vg: %d\n", tmq_get_vgroup_id(msg));
while (1) {
TAOS_ROW row = taos_fetch_row(msg);
......@@ -165,7 +166,6 @@ tmq_t* build_consumer() {
tmq_conf_set(conf, "group.id", "tg2");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
/*tmq_conf_set(conf, "td.connect.db", "abc1");*/
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set(conf, "enable.auto.commit", "false");
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
......@@ -191,21 +191,18 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
return;
}
int32_t cnt = 0;
/*clock_t startTime = clock();*/
while (running) {
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 0);
if (tmqmessage) {
cnt++;
msg_process(tmqmessage);
if (cnt >= 2) break;
/*if (cnt >= 2) break;*/
/*printf("get data\n");*/
taos_free_result(tmqmessage);
/*} else {*/
/*break;*/
}
}
/*clock_t endTime = clock();*/
/*printf("log cnt: %d %f s\n", cnt, (double)(endTime - startTime) / CLOCKS_PER_SEC);*/
err = tmq_consumer_close(tmq);
if (err)
......
......@@ -144,8 +144,8 @@ DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *nam
DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name);
DLL_EXPORT int taos_stmt_set_tags(TAOS_STMT *stmt, TAOS_MULTI_BIND *tags);
DLL_EXPORT int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name);
DLL_EXPORT int taos_stmt_get_tag_fields(TAOS_STMT *stmt, int* fieldNum, TAOS_FIELD_E** fields);
DLL_EXPORT int taos_stmt_get_col_fields(TAOS_STMT *stmt, int* fieldNum, TAOS_FIELD_E** fields);
DLL_EXPORT int taos_stmt_get_tag_fields(TAOS_STMT *stmt, int *fieldNum, TAOS_FIELD_E **fields);
DLL_EXPORT int taos_stmt_get_col_fields(TAOS_STMT *stmt, int *fieldNum, TAOS_FIELD_E **fields);
DLL_EXPORT int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert);
DLL_EXPORT int taos_stmt_num_params(TAOS_STMT *stmt, int *nums);
......@@ -269,6 +269,7 @@ DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_comm
/* -------------------------TMQ MSG HANDLE INTERFACE---------------------- */
DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
DLL_EXPORT const char *tmq_get_table_name(TAOS_RES *res);
......
......@@ -97,6 +97,7 @@ extern char *qtypeStr[];
#undef TD_DEBUG_PRINT_ROW
#undef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS
#undef TD_DEBUG_PRINT_TAG
#ifdef __cplusplus
}
......
......@@ -18,6 +18,7 @@
#include "os.h"
#include "talgo.h"
#include "tarray.h"
#include "tencode.h"
#include "ttypes.h"
#include "tutil.h"
......@@ -29,6 +30,7 @@ extern "C" {
typedef struct SSchema SSchema;
typedef struct STColumn STColumn;
typedef struct STSchema STSchema;
typedef struct SValue SValue;
typedef struct SColVal SColVal;
typedef struct STSRow2 STSRow2;
typedef struct STSRowBuilder STSRowBuilder;
......@@ -39,32 +41,37 @@ typedef struct STag STag;
int32_t tTSchemaCreate(int32_t sver, SSchema *pSchema, int32_t nCols, STSchema **ppTSchema);
void tTSchemaDestroy(STSchema *pTSchema);
// SColVal
#define ColValNONE ((SColVal){.type = COL_VAL_NONE, .nData = 0, .pData = NULL})
#define ColValNULL ((SColVal){.type = COL_VAL_NULL, .nData = 0, .pData = NULL})
#define ColValDATA(nData, pData) ((SColVal){.type = COL_VAL_DATA, .nData = (nData), .pData = (pData)})
// STSRow2
#define COL_VAL_NONE(CID) ((SColVal){.cid = (CID), .isNone = 1})
#define COL_VAL_NULL(CID) ((SColVal){.cid = (CID), .isNull = 1})
#define COL_VAL_VALUE(CID, V) ((SColVal){.cid = (CID), .value = (V)})
int32_t tTSRowClone(const STSRow2 *pRow, STSRow2 **ppRow);
void tTSRowFree(STSRow2 *pRow);
void tTSRowGet(STSRow2 *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal);
int32_t tTSRowToArray(STSRow2 *pRow, STSchema *pTSchema, SArray **ppArray);
int32_t tPutTSRow(uint8_t *p, STSRow2 *pRow);
int32_t tGetTSRow(uint8_t *p, STSRow2 *pRow);
int32_t tTSRowDup(const STSRow2 *pRow, STSRow2 **ppRow);
void tTSRowFree(STSRow2 *pRow);
int32_t tTSRowGet(const STSRow2 *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal);
// STSRowBuilder
#if 0
int32_t tTSRowBuilderInit(STSRowBuilder *pBuilder, int32_t sver, int32_t nCols, SSchema *pSchema);
void tTSRowBuilderClear(STSRowBuilder *pBuilder);
void tTSRowBuilderReset(STSRowBuilder *pBuilder);
int32_t tTSRowBuilderPut(STSRowBuilder *pBuilder, int32_t cid, uint8_t *pData, uint32_t nData);
int32_t tTSRowBuilderGetRow(STSRowBuilder *pBuilder, const STSRow2 **ppRow);
#endif
// STag
int32_t tTagNew(STagVal *pTagVals, int16_t nTag, STag **ppTag);
int32_t tTagNew(SArray *pArray, int32_t version, int8_t isJson, STag **ppTag);
void tTagFree(STag *pTag);
int32_t tTagSet(STag *pTag, SSchema *pSchema, int32_t nCols, int iCol, uint8_t *pData, uint32_t nData, STag **ppTag);
void tTagGet(STag *pTag, int16_t cid, int8_t type, uint8_t **ppData, uint32_t *nData);
bool tTagGet(const STag *pTag, STagVal *pTagVal);
char* tTagValToData(const STagVal *pTagVal, bool isJson);
int32_t tEncodeTag(SEncoder *pEncoder, const STag *pTag);
int32_t tDecodeTag(SDecoder *pDecoder, STag **ppTag);
int32_t tTagToValArray(const STag *pTag, SArray **ppArray);
void debugPrintSTag(STag *pTag, const char *tag, int32_t ln); // TODO: remove
void debugCheckTags(STag *pTag); // TODO: remove
// STRUCT =================
struct STColumn {
......@@ -87,7 +94,9 @@ struct STSchema {
#define TSROW_HAS_NONE ((uint8_t)0x1)
#define TSROW_HAS_NULL ((uint8_t)0x2U)
#define TSROW_HAS_VAL ((uint8_t)0x4U)
#define TSROW_KV_ROW ((uint8_t)0x10U)
#define TSROW_KV_SMALL ((uint8_t)0x10U)
#define TSROW_KV_MID ((uint8_t)0x20U)
#define TSROW_KV_BIG ((uint8_t)0x40U)
struct STSRow2 {
TSKEY ts;
uint8_t flags;
......@@ -110,20 +119,60 @@ struct STSRowBuilder {
STSRow2 row;
};
typedef enum { COL_VAL_NONE = 0, COL_VAL_NULL = 1, COL_VAL_DATA = 2 } EColValT;
struct SColVal {
EColValT type;
struct SValue {
union {
int8_t i8; // TSDB_DATA_TYPE_BOOL||TSDB_DATA_TYPE_TINYINT
uint8_t u8; // TSDB_DATA_TYPE_UTINYINT
int16_t i16; // TSDB_DATA_TYPE_SMALLINT
uint16_t u16; // TSDB_DATA_TYPE_USMALLINT
int32_t i32; // TSDB_DATA_TYPE_INT
uint32_t u32; // TSDB_DATA_TYPE_UINT
int64_t i64; // TSDB_DATA_TYPE_BIGINT
uint64_t u64; // TSDB_DATA_TYPE_UBIGINT
TSKEY ts; // TSDB_DATA_TYPE_TIMESTAMP
float f; // TSDB_DATA_TYPE_FLOAT
double d; // TSDB_DATA_TYPE_DOUBLE
struct {
uint32_t nData;
uint8_t *pData;
};
};
};
struct SColVal {
int16_t cid;
int8_t isNone;
int8_t isNull;
SValue value;
};
#pragma pack(push, 1)
struct STagVal {
union {
int16_t cid;
char *pKey;
};
int8_t type;
union {
int64_t i64;
struct {
uint32_t nData;
uint8_t *pData;
};
};
};
#define TD_TAG_JSON ((int8_t)0x40) // distinguish JSON string and JSON value with the highest bit
#define TD_TAG_LARGE ((int8_t)0x20)
struct STag {
int8_t flags;
int16_t len;
int16_t nTag;
int32_t ver;
int8_t idx[];
};
#pragma pack(pop)
#if 1 //================================================================================================================================================
// Imported since 3.0 and use bitmap to demonstrate None/Null/Norm, while use Null/Norm below 3.0 without of bitmap.
#define TD_SUPPORT_BITMAP
......@@ -366,109 +415,6 @@ SDataCols *tdFreeDataCols(SDataCols *pCols);
int32_t tdMergeDataCols(SDataCols *target, SDataCols *source, int32_t rowsToMerge, int32_t *pOffset, bool update,
TDRowVerT maxVer);
// ----------------- K-V data row structure
/* |<-------------------------------------- len -------------------------------------------->|
* |<----- header ----->|<--------------------------- body -------------------------------->|
* +----------+----------+---------------------------------+---------------------------------+
* | uint16_t | int16_t | | |
* +----------+----------+---------------------------------+---------------------------------+
* | len | ncols | cols index | data part |
* +----------+----------+---------------------------------+---------------------------------+
*/
typedef void *SKVRow;
typedef struct {
int16_t colId;
uint16_t offset;
} SColIdx;
#define TD_KV_ROW_HEAD_SIZE (sizeof(uint16_t) + sizeof(int16_t))
#define kvRowLen(r) (*(uint16_t *)(r))
#define kvRowNCols(r) (*(int16_t *)POINTER_SHIFT(r, sizeof(uint16_t)))
#define kvRowSetLen(r, len) kvRowLen(r) = (len)
#define kvRowSetNCols(r, n) kvRowNCols(r) = (n)
#define kvRowColIdx(r) (SColIdx *)POINTER_SHIFT(r, TD_KV_ROW_HEAD_SIZE)
#define kvRowValues(r) POINTER_SHIFT(r, TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * kvRowNCols(r))
#define kvRowCpy(dst, r) memcpy((dst), (r), kvRowLen(r))
#define kvRowColVal(r, colIdx) POINTER_SHIFT(kvRowValues(r), (colIdx)->offset)
#define kvRowColIdxAt(r, i) (kvRowColIdx(r) + (i))
#define kvRowFree(r) taosMemoryFreeClear(r)
#define kvRowEnd(r) POINTER_SHIFT(r, kvRowLen(r))
#define kvRowValLen(r) (kvRowLen(r) - TD_KV_ROW_HEAD_SIZE - sizeof(SColIdx) * kvRowNCols(r))
#define kvRowTKey(r) (*(TKEY *)(kvRowValues(r)))
#define kvRowKey(r) tdGetKey(kvRowTKey(r))
#define kvRowKeys(r) POINTER_SHIFT(r, *(uint16_t *)POINTER_SHIFT(r, TD_KV_ROW_HEAD_SIZE + sizeof(int16_t)))
#define kvRowDeleted(r) TKEY_IS_DELETED(kvRowTKey(r))
SKVRow tdKVRowDup(SKVRow row);
int32_t tdSetKVRowDataOfCol(SKVRow *orow, int16_t colId, int8_t type, void *value);
int32_t tdEncodeKVRow(void **buf, SKVRow row);
void *tdDecodeKVRow(void *buf, SKVRow *row);
void tdSortKVRowByColIdx(SKVRow row);
static FORCE_INLINE int32_t comparTagId(const void *key1, const void *key2) {
if (*(int16_t *)key1 > ((SColIdx *)key2)->colId) {
return 1;
} else if (*(int16_t *)key1 < ((SColIdx *)key2)->colId) {
return -1;
} else {
return 0;
}
}
static FORCE_INLINE void *tdGetKVRowValOfCol(const SKVRow row, int16_t colId) {
void *ret = taosbsearch(&colId, kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), comparTagId, TD_EQ);
if (ret == NULL) return NULL;
return kvRowColVal(row, (SColIdx *)ret);
}
static FORCE_INLINE void *tdGetKVRowIdxOfCol(SKVRow row, int16_t colId) {
return taosbsearch(&colId, kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), comparTagId, TD_EQ);
}
// ----------------- K-V data row builder
typedef struct {
int16_t tCols;
int16_t nCols;
SColIdx *pColIdx;
uint16_t alloc;
uint16_t size;
void *buf;
} SKVRowBuilder;
int32_t tdInitKVRowBuilder(SKVRowBuilder *pBuilder);
void tdDestroyKVRowBuilder(SKVRowBuilder *pBuilder);
void tdResetKVRowBuilder(SKVRowBuilder *pBuilder);
SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder);
static FORCE_INLINE int32_t tdAddColToKVRow(SKVRowBuilder *pBuilder, col_id_t colId, const void *value, int32_t tlen) {
if (pBuilder->nCols >= pBuilder->tCols) {
pBuilder->tCols *= 2;
SColIdx *pColIdx = (SColIdx *)taosMemoryRealloc((void *)(pBuilder->pColIdx), sizeof(SColIdx) * pBuilder->tCols);
if (pColIdx == NULL) return -1;
pBuilder->pColIdx = pColIdx;
}
pBuilder->pColIdx[pBuilder->nCols].colId = colId;
pBuilder->pColIdx[pBuilder->nCols].offset = pBuilder->size;
pBuilder->nCols++;
if (tlen > pBuilder->alloc - pBuilder->size) {
while (tlen > pBuilder->alloc - pBuilder->size) {
pBuilder->alloc *= 2;
}
void *buf = taosMemoryRealloc(pBuilder->buf, pBuilder->alloc);
if (buf == NULL) return -1;
pBuilder->buf = buf;
}
memcpy(POINTER_SHIFT(pBuilder->buf, pBuilder->size), value, tlen);
pBuilder->size += tlen;
return 0;
}
#endif
#ifdef __cplusplus
......@@ -476,3 +422,4 @@ static FORCE_INLINE int32_t tdAddColToKVRow(SKVRowBuilder *pBuilder, col_id_t co
#endif
#endif /*_TD_COMMON_DATA_FORMAT_H_*/
......@@ -244,7 +244,7 @@ typedef struct {
const void* pMsg;
} SSubmitMsgIter;
int32_t tInitSubmitMsgIter(SSubmitReq* pMsg, SSubmitMsgIter* pIter);
int32_t tInitSubmitMsgIter(const SSubmitReq* pMsg, SSubmitMsgIter* pIter);
int32_t tGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock);
int32_t tInitSubmitBlkIter(SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkIter* pIter);
STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter);
......@@ -287,7 +287,7 @@ typedef struct SSchema {
char name[TSDB_COL_NAME_LEN];
} SSchema;
#define COL_IS_SET(FLG) ((FLG) & (COL_SET_VAL | COL_SET_NULL) != 0)
#define COL_IS_SET(FLG) (((FLG) & (COL_SET_VAL | COL_SET_NULL)) != 0)
#define COL_CLR_SET(FLG) ((FLG) &= (~(COL_SET_VAL | COL_SET_NULL)))
#define IS_BSMA_ON(s) (((s)->flags & 0x01) == COL_SMA_ON)
......@@ -945,7 +945,6 @@ typedef struct {
int64_t timeInFetchQueue;
} SQnodeLoad;
typedef struct {
int32_t sver; // software version
int64_t dnodeVer; // dnode table version in sdb
......@@ -1002,7 +1001,6 @@ typedef struct {
typedef struct {
int32_t vgId;
int32_t dnodeId;
char db[TSDB_DB_FNAME_LEN];
int64_t dbUid;
int32_t vgVersion;
......@@ -1025,16 +1023,14 @@ typedef struct {
int8_t compression;
int8_t strict;
int8_t cacheLastRow;
int8_t isTsma;
int8_t standby;
int8_t replica;
int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA];
int32_t numOfRetensions;
SArray* pRetensions; // SRetention
// for tsma
int8_t isTsma;
void* pTsma;
} SCreateVnodeReq;
int32_t tSerializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq);
......@@ -1072,8 +1068,8 @@ typedef struct {
int8_t walLevel;
int8_t strict;
int8_t cacheLastRow;
int8_t replica;
int8_t selfIndex;
int8_t replica;
SReplica replicas[TSDB_MAX_REPLICA];
} SAlterVnodeReq;
......@@ -1786,6 +1782,15 @@ typedef struct SVCreateTbReq {
int tEncodeSVCreateTbReq(SEncoder* pCoder, const SVCreateTbReq* pReq);
int tDecodeSVCreateTbReq(SDecoder* pCoder, SVCreateTbReq* pReq);
static FORCE_INLINE void tdDestroySVCreateTbReq(SVCreateTbReq* req) {
taosMemoryFreeClear(req->name);
if (req->type == TSDB_CHILD_TABLE) {
taosMemoryFreeClear(req->ctb.pTag);
} else if (req->type == TSDB_NORMAL_TABLE) {
taosMemoryFreeClear(req->ntb.schemaRow.pSchema);
}
}
typedef struct {
int32_t nReqs;
union {
......@@ -1977,7 +1982,7 @@ typedef struct {
int8_t killConnection;
int8_t align[3];
SEpSet epSet;
SArray *pQnodeList;
SArray* pQnodeList;
} SQueryHbRspBasic;
typedef struct {
......@@ -2207,10 +2212,8 @@ typedef struct {
int64_t newConsumerId;
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
int8_t subType;
// int8_t withTbName;
// int8_t withSchema;
// int8_t withTag;
char* qmsg;
int64_t suid;
} SMqRebVgReq;
static FORCE_INLINE int32_t tEncodeSMqRebVgReq(void** buf, const SMqRebVgReq* pReq) {
......@@ -2221,11 +2224,10 @@ static FORCE_INLINE int32_t tEncodeSMqRebVgReq(void** buf, const SMqRebVgReq* pR
tlen += taosEncodeFixedI64(buf, pReq->newConsumerId);
tlen += taosEncodeString(buf, pReq->subKey);
tlen += taosEncodeFixedI8(buf, pReq->subType);
// tlen += taosEncodeFixedI8(buf, pReq->withTbName);
// tlen += taosEncodeFixedI8(buf, pReq->withSchema);
// tlen += taosEncodeFixedI8(buf, pReq->withTag);
if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) {
tlen += taosEncodeString(buf, pReq->qmsg);
} else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) {
tlen += taosEncodeFixedI64(buf, pReq->suid);
}
return tlen;
}
......@@ -2237,11 +2239,10 @@ static FORCE_INLINE void* tDecodeSMqRebVgReq(const void* buf, SMqRebVgReq* pReq)
buf = taosDecodeFixedI64(buf, &pReq->newConsumerId);
buf = taosDecodeStringTo(buf, pReq->subKey);
buf = taosDecodeFixedI8(buf, &pReq->subType);
// buf = taosDecodeFixedI8(buf, &pReq->withTbName);
// buf = taosDecodeFixedI8(buf, &pReq->withSchema);
// buf = taosDecodeFixedI8(buf, &pReq->withTag);
if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) {
buf = taosDecodeString(buf, &pReq->qmsg);
} else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) {
buf = taosDecodeFixedI64(buf, &pReq->suid);
}
return (void*)buf;
}
......@@ -2305,6 +2306,7 @@ typedef struct {
int8_t intervalUnit; // MACRO: TIME_UNIT_XXX
int8_t slidingUnit; // MACRO: TIME_UNIT_XXX
int8_t timezoneInt; // sma data expired if timezone changes.
int32_t dstVgId;
char indexName[TSDB_INDEX_NAME_LEN];
int32_t exprLen;
int32_t tagsFilterLen;
......@@ -2474,7 +2476,7 @@ static FORCE_INLINE void* tDecodeSMqSubVgEp(void* buf, SMqSubVgEp* pVgEp) {
typedef struct {
char topic[TSDB_TOPIC_FNAME_LEN];
int8_t isSchemaAdaptive;
char db[TSDB_DB_FNAME_LEN];
SArray* vgs; // SArray<SMqSubVgEp>
SSchemaWrapper schema;
} SMqSubTopicEp;
......@@ -2482,7 +2484,7 @@ typedef struct {
static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp* pTopicEp) {
int32_t tlen = 0;
tlen += taosEncodeString(buf, pTopicEp->topic);
tlen += taosEncodeFixedI8(buf, pTopicEp->isSchemaAdaptive);
tlen += taosEncodeString(buf, pTopicEp->db);
int32_t sz = taosArrayGetSize(pTopicEp->vgs);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
......@@ -2495,7 +2497,7 @@ static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp
static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicEp) {
buf = taosDecodeStringTo(buf, pTopicEp->topic);
buf = taosDecodeFixedI8(buf, &pTopicEp->isSchemaAdaptive);
buf = taosDecodeStringTo(buf, pTopicEp->db);
int32_t sz;
buf = taosDecodeFixedI32(buf, &sz);
pTopicEp->vgs = taosArrayInit(sz, sizeof(SMqSubVgEp));
......@@ -2573,6 +2575,12 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p
buf = taosDecodeFixedI8(buf, &pRsp->withTbName);
buf = taosDecodeFixedI8(buf, &pRsp->withSchema);
buf = taosDecodeFixedI8(buf, &pRsp->withTag);
if (pRsp->withTbName) {
pRsp->blockTbName = taosArrayInit(pRsp->blockNum, sizeof(void*));
}
if (pRsp->withSchema) {
pRsp->blockSchema = taosArrayInit(pRsp->blockNum, sizeof(void*));
}
for (int32_t i = 0; i < pRsp->blockNum; i++) {
int32_t bLen = 0;
......@@ -2582,20 +2590,14 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p
taosArrayPush(pRsp->blockDataLen, &bLen);
taosArrayPush(pRsp->blockData, &data);
if (pRsp->withSchema) {
pRsp->blockSchema = taosArrayInit(pRsp->blockNum, sizeof(void*));
SSchemaWrapper* pSW = (SSchemaWrapper*)taosMemoryMalloc(sizeof(SSchemaWrapper));
buf = taosDecodeSSchemaWrapper(buf, pSW);
taosArrayPush(pRsp->blockSchema, &pSW);
} else {
pRsp->blockSchema = NULL;
}
if (pRsp->withTbName) {
pRsp->blockTbName = taosArrayInit(pRsp->blockNum, sizeof(void*));
char* name = NULL;
buf = taosDecodeString(buf, &name);
taosArrayPush(pRsp->blockTbName, &name);
} else {
pRsp->blockTbName = NULL;
}
}
}
......@@ -2662,6 +2664,23 @@ typedef struct {
int32_t tEncodeSVSubmitReq(SEncoder* pCoder, const SVSubmitReq* pReq);
int32_t tDecodeSVSubmitReq(SDecoder* pCoder, SVSubmitReq* pReq);
// TDMT_VND_DELETE
typedef struct {
TSKEY sKey;
TSKEY eKey;
// super table
char* stbName;
// child/normal
char* tbName;
} SVDeleteReq;
typedef struct {
int32_t code;
// TODO
} SVDeleteRsp;
#pragma pack(pop)
#ifdef __cplusplus
......
......@@ -221,9 +221,11 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_APPLY_MSG, "vnode-sync-apply-msg", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_CONFIG_CHANGE, "vnode-sync-config-change", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_VNODE, "vnode-sync-vnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_VNODE, "vnode-alter-vnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_COMPACT_VNODE, "vnode-compact-vnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "vnode-alter-config", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_REPLICA, "vnode-alter-replica", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_COMPACT, "vnode-compact", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DELETE, "vnode-delete-data", SVDeleteReq, SVDeleteRsp)
// Requests handled by QNODE
TD_NEW_MSG_SEG(TDMT_QND_MSG)
......
......@@ -156,6 +156,9 @@ bool fmIsDynamicScanOptimizedFunc(int32_t funcId);
bool fmIsMultiResFunc(int32_t funcId);
bool fmIsRepeatScanFunc(int32_t funcId);
bool fmIsUserDefinedFunc(int32_t funcId);
bool fmIsDistExecFunc(int32_t funcId);
int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMergeFunc);
typedef enum EFuncDataRequired {
FUNC_DATA_REQUIRED_DATA_LOAD = 1,
......
......@@ -189,6 +189,7 @@ typedef enum ENodeType {
QUERY_NODE_LOGIC_PLAN_PROJECT,
QUERY_NODE_LOGIC_PLAN_VNODE_MODIF,
QUERY_NODE_LOGIC_PLAN_EXCHANGE,
QUERY_NODE_LOGIC_PLAN_MERGE,
QUERY_NODE_LOGIC_PLAN_WINDOW,
QUERY_NODE_LOGIC_PLAN_FILL,
QUERY_NODE_LOGIC_PLAN_SORT,
......@@ -206,6 +207,7 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_JOIN,
QUERY_NODE_PHYSICAL_PLAN_AGG,
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE,
QUERY_NODE_PHYSICAL_PLAN_MERGE,
QUERY_NODE_PHYSICAL_PLAN_SORT,
QUERY_NODE_PHYSICAL_PLAN_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL,
......
......@@ -95,9 +95,15 @@ typedef struct SVnodeModifLogicNode {
typedef struct SExchangeLogicNode {
SLogicNode node;
int32_t srcGroupId;
uint8_t precision;
} SExchangeLogicNode;
typedef struct SMergeLogicNode {
SLogicNode node;
SNodeList* pMergeKeys;
int32_t numOfChannels;
int32_t srcGroupId;
} SMergeLogicNode;
typedef enum EWindowType { WINDOW_TYPE_INTERVAL = 1, WINDOW_TYPE_SESSION, WINDOW_TYPE_STATE } EWindowType;
typedef struct SWindowLogicNode {
......@@ -268,6 +274,13 @@ typedef struct SExchangePhysiNode {
SNodeList* pSrcEndPoints; // element is SDownstreamSource, scheduler fill by calling qSetSuplanExecutionNode
} SExchangePhysiNode;
typedef struct SMergePhysiNode {
SPhysiNode node;
SNodeList* pMergeKeys;
int32_t numOfChannels;
int32_t srcGroupId;
} SMergePhysiNode;
typedef struct SWinodwPhysiNode {
SPhysiNode node;
SNodeList* pExprs; // these are expression list of parameter expression of function
......
......@@ -62,10 +62,7 @@ enum {
typedef struct {
int8_t type;
int32_t sourceVg;
int64_t sourceVer;
int64_t ver;
int32_t* dataRef;
SSubmitReq* data;
} SStreamDataSubmit;
......@@ -83,6 +80,37 @@ typedef struct {
int8_t type;
} SStreamCheckpoint;
typedef struct {
STaosQueue* queue;
STaosQall* qall;
void* qItem;
int8_t failed;
} SStreamQ;
static FORCE_INLINE void* streamQCurItem(SStreamQ* queue) {
//
return queue->qItem;
}
static FORCE_INLINE void* streamQNextItem(SStreamQ* queue) {
int8_t failed = atomic_load_8(&queue->failed);
if (failed) {
ASSERT(queue->qItem != NULL);
return streamQCurItem(queue);
} else {
taosGetQitem(queue->qall, &queue->qItem);
if (queue->qItem == NULL) {
taosReadAllQitems(queue->queue, queue->qall);
taosGetQitem(queue->qall, &queue->qItem);
}
return streamQCurItem(queue);
}
}
static FORCE_INLINE void streamQSetFail(SStreamQ* queue) { atomic_store_8(&queue->failed, 1); }
static FORCE_INLINE void streamQSetSuccess(SStreamQ* queue) { atomic_store_8(&queue->failed, 0); }
static FORCE_INLINE SStreamDataSubmit* streamDataSubmitNew(SSubmitReq* pReq) {
SStreamDataSubmit* pDataSubmit = (SStreamDataSubmit*)taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM);
if (pDataSubmit == NULL) return NULL;
......@@ -111,6 +139,8 @@ static FORCE_INLINE void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit)
}
}
SStreamDataSubmit* streamSubmitRefClone(SStreamDataSubmit* pSubmit);
int32_t streamDataBlockEncode(void** buf, const SStreamDataBlock* pOutput);
void* streamDataBlockDecode(const void* buf, SStreamDataBlock* pInput);
......@@ -209,8 +239,6 @@ struct SStreamTask {
int32_t nodeId;
SEpSet epSet;
// source preprocess
// exec
STaskExec exec;
......@@ -318,8 +346,6 @@ int32_t streamDequeueOutput(SStreamTask* pTask, void** output);
int32_t streamTaskRun(SStreamTask* pTask);
int32_t streamTaskHandleInput(SStreamTask* pTask, void* data);
int32_t streamTaskProcessRunReq(SStreamTask* pTask, SMsgCb* pMsgCb);
int32_t streamProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchReq* pReq, SRpcMsg* pMsg);
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchRsp* pRsp);
......
......@@ -33,8 +33,19 @@ extern "C" {
#ifdef WINDOWS
#define TD_TMP_DIR_PATH "C:\\Windows\\Temp\\"
#define TD_CFG_DIR_PATH "C:\\TDengine\\cfg\\"
#define TD_DATA_DIR_PATH "C:\\TDengine\\data\\"
#define TD_LOG_DIR_PATH "C:\\TDengine\\log\\"
#elif defined(_TD_DARWIN_64)
#define TD_TMP_DIR_PATH "/tmp/taosd/"
#define TD_CFG_DIR_PATH "/usr/local/etc/taos/"
#define TD_DATA_DIR_PATH "/usr/local/var/lib/taos/"
#define TD_LOG_DIR_PATH "/usr/local/var/log/taos/"
#else
#define TD_TMP_DIR_PATH "/tmp/"
#define TD_CFG_DIR_PATH "/etc/taos/"
#define TD_DATA_DIR_PATH "/var/lib/taos/"
#define TD_LOG_DIR_PATH "/var/log/taos/"
#endif
typedef struct TdDir *TdDirPtr;
......
......@@ -70,6 +70,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_NEED_RETRY TAOS_DEF_ERROR_CODE(0, 0x0028)
#define TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE TAOS_DEF_ERROR_CODE(0, 0x0029)
#define TSDB_CODE_INVALID_TIMESTAMP TAOS_DEF_ERROR_CODE(0, 0x0030)
#define TSDB_CODE_MSG_DECODE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0031)
#define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0040)
#define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0041)
......@@ -84,6 +85,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_RPC_NETWORK_UNAVAIL TAOS_DEF_ERROR_CODE(0, 0x0102)
#define TSDB_CODE_RPC_FQDN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0103)
#define TSDB_CODE_RPC_PORT_EADDRINUSE TAOS_DEF_ERROR_CODE(0, 0x0104)
#define TSDB_CODE_RPC_INDIRECT_NETWORK_UNAVAIL TAOS_DEF_ERROR_CODE(0, 0x0105)
//client
#define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200)
......@@ -183,8 +185,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_BNODE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0357)
#define TSDB_CODE_MND_TOO_FEW_MNODES TAOS_DEF_ERROR_CODE(0, 0x0358)
#define TSDB_CODE_MND_TOO_MANY_MNODES TAOS_DEF_ERROR_CODE(0, 0x0359)
#define TSDB_CODE_MND_MNODE_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x035A)
#define TSDB_CODE_MND_CANT_DROP_MASTER TAOS_DEF_ERROR_CODE(0, 0x035B)
#define TSDB_CODE_MND_CANT_DROP_MASTER TAOS_DEF_ERROR_CODE(0, 0x035A)
// mnode-acct
#define TSDB_CODE_MND_ACCT_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0360)
......
......@@ -209,7 +209,7 @@ typedef enum ELogicConditionType {
#define TSDB_INDEX_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_INDEX_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
#define TSDB_TYPE_STR_MAX_LEN 32
#define TSDB_TABLE_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
#define TSDB_TOPIC_FNAME_LEN TSDB_TABLE_FNAME_LEN
#define TSDB_TOPIC_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
#define TSDB_STREAM_FNAME_LEN TSDB_TABLE_FNAME_LEN
#define TSDB_SUBSCRIBE_KEY_LEN (TSDB_CGROUP_LEN + TSDB_TOPIC_FNAME_LEN + 2)
#define TSDB_PARTITION_KEY_LEN (TSDB_SUBSCRIBE_KEY_LEN + 20)
......
......@@ -534,6 +534,26 @@ static FORCE_INLINE int32_t tPutI64(uint8_t* p, int64_t v) {
return sizeof(int64_t);
}
static FORCE_INLINE int32_t tPutFloat(uint8_t* p, float f) {
union {
uint32_t ui;
float f;
} v;
v.f = f;
return tPutU32(p, v.ui);
}
static FORCE_INLINE int32_t tPutDouble(uint8_t* p, double d) {
union {
uint64_t ui;
double d;
} v;
v.d = d;
return tPutU64(p, v.ui);
}
static FORCE_INLINE int32_t tPutU16v(uint8_t* p, uint16_t v) { tPutV(p, v); }
static FORCE_INLINE int32_t tPutI16v(uint8_t* p, int16_t v) { return tPutU16v(p, ZIGZAGE(int16_t, v)); }
......@@ -623,6 +643,34 @@ static FORCE_INLINE int32_t tGetI64v(uint8_t* p, int64_t* v) {
return n;
}
static FORCE_INLINE int32_t tGetFloat(uint8_t* p, float* f) {
int32_t n = 0;
union {
uint32_t ui;
float f;
} v;
n = tGetU32(p, &v.ui);
*f = v.f;
return n;
}
static FORCE_INLINE int32_t tGetDouble(uint8_t* p, double* d) {
int32_t n = 0;
union {
uint64_t ui;
double d;
} v;
n = tGetU64(p, &v.ui);
*d = v.d;
return n;
}
// =====================
static FORCE_INLINE int32_t tPutBinary(uint8_t* p, uint8_t* pData, uint32_t nData) {
int n = 0;
......@@ -646,6 +694,11 @@ static FORCE_INLINE int32_t tGetBinary(uint8_t* p, uint8_t** ppData, uint32_t* n
return n;
}
static FORCE_INLINE int32_t tPutCStr(uint8_t* p, char* pData) {
return tPutBinary(p, (uint8_t*)pData, strlen(pData) + 1);
}
static FORCE_INLINE int32_t tGetCStr(uint8_t* p, char** ppData) { return tGetBinary(p, (uint8_t**)ppData, NULL); }
#ifdef __cplusplus
}
#endif
......
......@@ -31,16 +31,16 @@ extern int32_t taosTmrThreads;
void *taosTmrInit(int32_t maxTmr, int32_t resoultion, int32_t longest, const char *label);
void taosTmrCleanUp(void *handle);
tmr_h taosTmrStart(TAOS_TMR_CALLBACK fp, int32_t mseconds, void *param, void *handle);
bool taosTmrStop(tmr_h tmrId);
bool taosTmrStopA(tmr_h *timerId);
bool taosTmrStopA(tmr_h *tmrId);
bool taosTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void *param, void *handle, tmr_h *pTmrId);
void taosTmrCleanUp(void *handle);
#ifdef __cplusplus
}
#endif
......
......@@ -192,6 +192,7 @@ typedef struct SRequestSendRecvBody {
typedef struct {
int8_t resType;
char topic[TSDB_TOPIC_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN];
int32_t vgId;
SSchemaWrapper schema;
int32_t resIter;
......@@ -220,7 +221,8 @@ typedef struct SRequestObj {
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4);
void doSetOneRowPtr(SReqResultInfo* pResultInfo);
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision);
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4);
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4,
bool freeAfterUse);
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols);
void doFreeReqResultInfo(SReqResultInfo* pResInfo);
......@@ -242,7 +244,7 @@ static FORCE_INLINE SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool conver
taosMemoryFreeClear(msg->resInfo.length);
taosMemoryFreeClear(msg->resInfo.convertBuf);
}
setQueryResultFromRsp(&msg->resInfo, pRetrieve, convertUcs4);
setQueryResultFromRsp(&msg->resInfo, pRetrieve, convertUcs4, false);
return &msg->resInfo;
}
return NULL;
......@@ -320,7 +322,7 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code
int32_t getQueryPlan(SRequestObj* pRequest, SQuery* pQuery, SArray** pNodeList);
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList);
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest);
int32_t updateQnodeList(SAppInstInfo*pInfo, SArray* pNodeList);
int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList);
#ifdef __cplusplus
}
......
......@@ -203,7 +203,7 @@ int32_t execLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
SRetrieveTableRsp* pRsp = NULL;
int32_t code = qExecCommand(pQuery->pRoot, &pRsp);
if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, false);
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, false, false);
}
return code;
}
......@@ -230,8 +230,8 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
}
int compareQueryNodeLoad(const void* elem1, const void* elem2) {
SQueryNodeLoad *node1 = (SQueryNodeLoad *)elem1;
SQueryNodeLoad *node2 = (SQueryNodeLoad *)elem2;
SQueryNodeLoad* node1 = (SQueryNodeLoad*)elem1;
SQueryNodeLoad* node2 = (SQueryNodeLoad*)elem2;
if (node1->load < node2->load) {
return -1;
......@@ -240,7 +240,7 @@ int compareQueryNodeLoad(const void* elem1, const void* elem2) {
return node1->load > node2->load;
}
int32_t updateQnodeList(SAppInstInfo*pInfo, SArray* pNodeList) {
int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList) {
taosThreadMutexLock(&pInfo->qnodeMutex);
if (pInfo->pQnodeList) {
taosArrayDestroy(pInfo->pQnodeList);
......@@ -257,7 +257,7 @@ int32_t updateQnodeList(SAppInstInfo*pInfo, SArray* pNodeList) {
}
int32_t getQnodeList(SRequestObj* pRequest, SArray** pNodeList) {
SAppInstInfo*pInfo = pRequest->pTscObj->pAppInfo;
SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
int32_t code = 0;
taosThreadMutexLock(&pInfo->qnodeMutex);
......@@ -384,7 +384,6 @@ int32_t scheduleAsyncQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNod
return pRequest->code;
}
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
......@@ -826,7 +825,8 @@ void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg,
SCatalog* pCatalog = NULL;
int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
tscError("fail to get catalog handle, clusterId:%" PRIx64 ", error %s", pTscObj->pAppInfo->clusterId, tstrerror(code));
tscError("fail to get catalog handle, clusterId:%" PRIx64 ", error %s", pTscObj->pAppInfo->clusterId,
tstrerror(code));
return;
}
......@@ -839,7 +839,6 @@ void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg,
}
}
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
assert(pMsg->info.ahandle != NULL);
......@@ -971,7 +970,8 @@ void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertU
return NULL;
}
pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData, convertUcs4);
pRequest->code =
setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData, convertUcs4, true);
if (pRequest->code != TSDB_CODE_SUCCESS) {
pResultInfo->numOfRows = 0;
return NULL;
......@@ -993,9 +993,8 @@ void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertU
return pResultInfo->row;
}
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
//return doAsyncFetchRows(pRequest, setupOneRowPtr, convertUcs4);
// return doAsyncFetchRows(pRequest, setupOneRowPtr, convertUcs4);
assert(pRequest != NULL);
SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
......@@ -1013,7 +1012,8 @@ void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4)
return NULL;
}
pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData, convertUcs4);
pRequest->code =
setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData, convertUcs4, true);
if (pRequest->code != TSDB_CODE_SUCCESS) {
pResultInfo->numOfRows = 0;
return NULL;
......@@ -1057,27 +1057,20 @@ static char* parseTagDatatoJson(void* p) {
goto end;
}
int16_t nCols = kvRowNCols(p);
char tagJsonKey[256] = {0};
for (int j = 0; j < nCols; ++j) {
SColIdx* pColIdx = kvRowColIdxAt(p, j);
char* val = (char*)(kvRowColVal(p, pColIdx));
if (j == 0) {
if (*val == TSDB_DATA_TYPE_NULL) {
string = taosMemoryCalloc(1, 8);
sprintf(string, "%s", TSDB_DATA_NULL_STR_L);
SArray* pTagVals = NULL;
if (tTagToValArray((const STag*)p, &pTagVals) != 0) {
goto end;
}
continue;
}
int16_t nCols = taosArrayGetSize(pTagVals);
char tagJsonKey[256] = {0};
for (int j = 0; j < nCols; ++j) {
STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j);
// json key encode by binary
memset(tagJsonKey, 0, sizeof(tagJsonKey));
memcpy(tagJsonKey, varDataVal(val), varDataLen(val));
memcpy(tagJsonKey, pTagVal->pKey, strlen(pTagVal->pKey));
// json value
val += varDataTLen(val);
char* realData = POINTER_SHIFT(val, CHAR_BYTES);
char type = *val;
char type = pTagVal->type;
if (type == TSDB_DATA_TYPE_NULL) {
cJSON* value = cJSON_CreateNull();
if (value == NULL) {
......@@ -1086,11 +1079,12 @@ static char* parseTagDatatoJson(void* p) {
cJSON_AddItemToObject(json, tagJsonKey, value);
} else if (type == TSDB_DATA_TYPE_NCHAR) {
cJSON* value = NULL;
if (varDataLen(realData) > 0) {
char* tagJsonValue = taosMemoryCalloc(varDataLen(realData), 1);
int32_t length = taosUcs4ToMbs((TdUcs4*)varDataVal(realData), varDataLen(realData), tagJsonValue);
if (pTagVal->nData > 0) {
char* tagJsonValue = taosMemoryCalloc(pTagVal->nData, 1);
int32_t length = taosUcs4ToMbs((TdUcs4*)pTagVal->pData, pTagVal->nData, tagJsonValue);
if (length < 0) {
tscError("charset:%s to %s. val:%s convert json value failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, val);
tscError("charset:%s to %s. val:%s convert json value failed.", DEFAULT_UNICODE_ENCODEC, tsCharset,
pTagVal->pData);
taosMemoryFree(tagJsonValue);
goto end;
}
......@@ -1099,7 +1093,7 @@ static char* parseTagDatatoJson(void* p) {
if (value == NULL) {
goto end;
}
} else if (varDataLen(realData) == 0) {
} else if (pTagVal->nData == 0) {
value = cJSON_CreateString("");
} else {
ASSERT(0);
......@@ -1107,22 +1101,14 @@ static char* parseTagDatatoJson(void* p) {
cJSON_AddItemToObject(json, tagJsonKey, value);
} else if (type == TSDB_DATA_TYPE_DOUBLE) {
double jsonVd = *(double*)(realData);
double jsonVd = *(double*)(&pTagVal->i64);
cJSON* value = cJSON_CreateNumber(jsonVd);
if (value == NULL) {
goto end;
}
cJSON_AddItemToObject(json, tagJsonKey, value);
// }else if(type == TSDB_DATA_TYPE_BIGINT){
// int64_t jsonVd = *(int64_t*)(realData);
// cJSON* value = cJSON_CreateNumber((double)jsonVd);
// if (value == NULL)
// {
// goto end;
// }
// cJSON_AddItemToObject(json, tagJsonKey, value);
} else if (type == TSDB_DATA_TYPE_BOOL) {
char jsonVd = *(char*)(realData);
char jsonVd = *(char*)(&pTagVal->i64);
cJSON* value = cJSON_CreateBool(jsonVd);
if (value == NULL) {
goto end;
......@@ -1187,7 +1173,7 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
sprintf(varDataVal(dst), "%s", TSDB_DATA_NULL_STR_L);
varDataSetLen(dst, strlen(varDataVal(dst)));
} else if (jsonInnerType == TSDB_DATA_TYPE_JSON) {
} else if (jsonInnerType == TD_TAG_JSON) {
char* jsonString = parseTagDatatoJson(jsonInnerData);
STR_TO_VARSTR(dst, jsonString);
taosMemoryFree(jsonString);
......@@ -1206,10 +1192,6 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
double jsonVd = *(double*)(jsonInnerData);
sprintf(varDataVal(dst), "%.9lf", jsonVd);
varDataSetLen(dst, strlen(varDataVal(dst)));
} else if (jsonInnerType == TSDB_DATA_TYPE_BIGINT) {
int64_t jsonVd = *(int64_t*)(jsonInnerData);
sprintf(varDataVal(dst), "%" PRId64, jsonVd);
varDataSetLen(dst, strlen(varDataVal(dst)));
} else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
sprintf(varDataVal(dst), "%s", (*((char*)jsonInnerData) == 1) ? "true" : "false");
varDataSetLen(dst, strlen(varDataVal(dst)));
......@@ -1320,10 +1302,11 @@ void resetConnectDB(STscObj* pTscObj) {
taosThreadMutexUnlock(&pTscObj->mutex);
}
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4) {
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4,
bool freeAfterUse) {
assert(pResultInfo != NULL && pRsp != NULL);
taosMemoryFreeClear(pResultInfo->pRspMsg);
if (freeAfterUse) taosMemoryFreeClear(pResultInfo->pRspMsg);
pResultInfo->pRspMsg = (const char*)pRsp;
pResultInfo->pData = (void*)pRsp->data;
......
......@@ -143,6 +143,7 @@ typedef struct {
typedef struct {
// subscribe info
char* topicName;
char db[TSDB_DB_FNAME_LEN];
SArray* vgs; // SArray<SMqClientVg>
......@@ -1039,6 +1040,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
topic.schema = pTopicEp->schema;
taosHashClear(pHash);
topic.topicName = strdup(pTopicEp->topic);
tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);
tscDebug("consumer %ld update topic: %s", tmq->consumerId, topic.topicName);
int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
......@@ -1283,7 +1285,8 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic*
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
pRspObj->resType = RES_TYPE__TMQ;
strncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
pRspObj->vgId = pWrapper->vgHandle->vgId;
pRspObj->resIter = -1;
memcpy(&pRspObj->rsp, &pWrapper->msg, sizeof(SMqDataBlkRsp));
......@@ -1506,6 +1509,15 @@ const char* tmq_get_topic_name(TAOS_RES* res) {
}
}
const char* tmq_get_db_name(TAOS_RES* res) {
if (TD_RES_TMQ(res)) {
SMqRspObj* pRspObj = (SMqRspObj*)res;
return strchr(pRspObj->db, '.') + 1;
} else {
return NULL;
}
}
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
if (TD_RES_TMQ(res)) {
SMqRspObj* pRspObj = (SMqRspObj*)res;
......
......@@ -36,6 +36,7 @@ static const SSysDbTableSchema mnodesSchema[] = {
{.name = "id", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "endpoint", .bytes = TSDB_EP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "role", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "status", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
};
......
......@@ -116,22 +116,23 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con
int32_t type = pColumnInfoData->info.type;
if (IS_VAR_DATA_TYPE(type)) {
int32_t dataLen = varDataTLen(pData);
int32_t dataLen = 0;
if (type == TSDB_DATA_TYPE_JSON) {
if (*pData == TSDB_DATA_TYPE_NULL) {
dataLen = 0;
dataLen = CHAR_BYTES;
} else if (*pData == TSDB_DATA_TYPE_NCHAR) {
dataLen = varDataTLen(pData + CHAR_BYTES);
dataLen = varDataTLen(pData + CHAR_BYTES) + CHAR_BYTES;
} else if (*pData == TSDB_DATA_TYPE_DOUBLE) {
dataLen = DOUBLE_BYTES;
dataLen = DOUBLE_BYTES + CHAR_BYTES;
} else if (*pData == TSDB_DATA_TYPE_BOOL) {
dataLen = CHAR_BYTES;
} else if (*pData == TSDB_DATA_TYPE_JSON) {
dataLen = kvRowLen(pData + CHAR_BYTES);
dataLen = CHAR_BYTES + CHAR_BYTES;
} else if (*pData == TD_TAG_JSON) { // json string
dataLen = ((STag*)(pData))->len;
} else {
ASSERT(0);
}
dataLen += CHAR_BYTES;
}else {
dataLen = varDataTLen(pData);
}
SVarColAttr* pAttr = &pColumnInfoData->varmeta;
......@@ -1634,6 +1635,11 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, bool createTb, int64_t suid,
const char* stbFullName, int32_t vgId) {
SSubmitReq* ret = NULL;
SArray* tagArray = taosArrayInit(1, sizeof(STagVal));
if(!tagArray) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
// cal size
int32_t cap = sizeof(SSubmitReq);
......@@ -1655,18 +1661,33 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
createTbReq.type = TSDB_CHILD_TABLE;
createTbReq.ctb.suid = suid;
SKVRowBuilder kvRowBuilder = {0};
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) {
ASSERT(0);
STagVal tagVal = {.cid = 1,
.type = TSDB_DATA_TYPE_UBIGINT,
.pData = (uint8_t*)&pDataBlock->info.groupId,
.nData = sizeof(uint64_t)};
STag* pTag = NULL;
taosArrayClear(tagArray);
taosArrayPush(tagArray, &tagVal);
tTagNew(tagArray, 1, false, &pTag);
if (!pTag) {
tdDestroySVCreateTbReq(&createTbReq);
taosArrayDestroy(tagArray);
return NULL;
}
tdAddColToKVRow(&kvRowBuilder, 1, &pDataBlock->info.groupId, sizeof(uint64_t));
createTbReq.ctb.pTag = tdGetKVRowFromBuilder(&kvRowBuilder);
tdDestroyKVRowBuilder(&kvRowBuilder);
createTbReq.ctb.pTag = (uint8_t*)pTag;
int32_t code;
tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);
if (code < 0) return NULL;
taosMemoryFree(cname);
tdDestroySVCreateTbReq(&createTbReq);
if (code < 0) {
tdDestroySVCreateTbReq(&createTbReq);
taosArrayDestroy(tagArray);
return NULL;
}
}
cap += sizeof(SSubmitBlk) + schemaLen + rows * maxLen;
......@@ -1709,22 +1730,42 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
createTbReq.type = TSDB_CHILD_TABLE;
createTbReq.ctb.suid = suid;
SKVRowBuilder kvRowBuilder = {0};
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) {
ASSERT(0);
STagVal tagVal = {.cid = 1,
.type = TSDB_DATA_TYPE_UBIGINT,
.pData = (uint8_t*)&pDataBlock->info.groupId,
.nData = sizeof(uint64_t)};
taosArrayClear(tagArray);
taosArrayPush(tagArray, &tagVal);
STag* pTag = NULL;
tTagNew(tagArray, 1, false, &pTag);
if (!pTag) {
tdDestroySVCreateTbReq(&createTbReq);
taosArrayDestroy(tagArray);
taosMemoryFreeClear(ret);
return NULL;
}
tdAddColToKVRow(&kvRowBuilder, 1, &pDataBlock->info.groupId, sizeof(uint64_t));
createTbReq.ctb.pTag = tdGetKVRowFromBuilder(&kvRowBuilder);
tdDestroyKVRowBuilder(&kvRowBuilder);
createTbReq.ctb.pTag = (uint8_t*)pTag;
int32_t code;
tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);
if (code < 0) return NULL;
if (code < 0) {
tdDestroySVCreateTbReq(&createTbReq);
taosArrayDestroy(tagArray);
taosMemoryFreeClear(ret);
return NULL;
}
SEncoder encoder = {0};
tEncoderInit(&encoder, blockData, schemaLen);
if (tEncodeSVCreateTbReq(&encoder, &createTbReq) < 0) return NULL;
code = tEncodeSVCreateTbReq(&encoder, &createTbReq);
tEncoderClear(&encoder);
tdDestroySVCreateTbReq(&createTbReq);
if (code < 0) {
taosArrayDestroy(tagArray);
taosMemoryFreeClear(ret);
return NULL;
}
}
blkHead->schemaLen = htonl(schemaLen);
......@@ -1759,5 +1800,6 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
}
ret->length = htonl(ret->length);
taosArrayDestroy(tagArray);
return ret;
}
此差异已折叠。
......@@ -28,7 +28,7 @@
#undef TD_MSG_SEG_CODE_
#include "tmsgdef.h"
int32_t tInitSubmitMsgIter(SSubmitReq *pMsg, SSubmitMsgIter *pIter) {
int32_t tInitSubmitMsgIter(const SSubmitReq *pMsg, SSubmitMsgIter *pIter) {
if (pMsg == NULL) {
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
return -1;
......@@ -165,7 +165,6 @@ int32_t tDecodeSQueryNodeLoad(SDecoder *pDecoder, SQueryNodeLoad *pLoad) {
return 0;
}
int32_t taosEncodeSEpSet(void **buf, const SEpSet *pEp) {
int32_t tlen = 0;
tlen += taosEncodeFixedI8(buf, pEp->inUse);
......@@ -2935,7 +2934,6 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pReq->vgId) < 0) return -1;
if (tEncodeI32(&encoder, pReq->dnodeId) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
if (tEncodeI64(&encoder, pReq->dbUid) < 0) return -1;
if (tEncodeI32(&encoder, pReq->vgVersion) < 0) return -1;
......@@ -2958,6 +2956,7 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR
if (tEncodeI8(&encoder, pReq->compression) < 0) return -1;
if (tEncodeI8(&encoder, pReq->strict) < 0) return -1;
if (tEncodeI8(&encoder, pReq->cacheLastRow) < 0) return -1;
if (tEncodeI8(&encoder, pReq->standby) < 0) return -1;
if (tEncodeI8(&encoder, pReq->replica) < 0) return -1;
if (tEncodeI8(&encoder, pReq->selfIndex) < 0) return -1;
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
......@@ -2992,7 +2991,6 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->dbUid) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->vgVersion) < 0) return -1;
......@@ -3015,6 +3013,7 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
if (tDecodeI8(&decoder, &pReq->compression) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->strict) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->cacheLastRow) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->standby) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->replica) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->selfIndex) < 0) return -1;
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
......@@ -3054,7 +3053,7 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
int32_t tFreeSCreateVnodeReq(SCreateVnodeReq *pReq) {
taosArrayDestroy(pReq->pRetensions);
pReq->pRetensions = NULL;
if(pReq->isTsma) {
if (pReq->isTsma) {
taosMemoryFreeClear(pReq->pTsma);
}
return 0;
......@@ -3135,8 +3134,8 @@ int32_t tSerializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pReq
if (tEncodeI8(&encoder, pReq->walLevel) < 0) return -1;
if (tEncodeI8(&encoder, pReq->strict) < 0) return -1;
if (tEncodeI8(&encoder, pReq->cacheLastRow) < 0) return -1;
if (tEncodeI8(&encoder, pReq->replica) < 0) return -1;
if (tEncodeI8(&encoder, pReq->selfIndex) < 0) return -1;
if (tEncodeI8(&encoder, pReq->replica) < 0) return -1;
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
SReplica *pReplica = &pReq->replicas[i];
if (tEncodeSReplica(&encoder, pReplica) < 0) return -1;
......@@ -3166,8 +3165,8 @@ int32_t tDeserializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pR
if (tDecodeI8(&decoder, &pReq->walLevel) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->strict) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->cacheLastRow) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->replica) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->selfIndex) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->replica) < 0) return -1;
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
SReplica *pReplica = &pReq->replicas[i];
if (tDecodeSReplica(&decoder, pReplica) < 0) return -1;
......@@ -3655,6 +3654,7 @@ int32_t tEncodeTSma(SEncoder *pCoder, const STSma *pSma) {
if (tEncodeI8(pCoder, pSma->intervalUnit) < 0) return -1;
if (tEncodeI8(pCoder, pSma->slidingUnit) < 0) return -1;
if (tEncodeI8(pCoder, pSma->timezoneInt) < 0) return -1;
if (tEncodeI32(pCoder, pSma->dstVgId) < 0) return -1;
if (tEncodeCStr(pCoder, pSma->indexName) < 0) return -1;
if (tEncodeI32(pCoder, pSma->exprLen) < 0) return -1;
if (tEncodeI32(pCoder, pSma->tagsFilterLen) < 0) return -1;
......@@ -3677,6 +3677,7 @@ int32_t tDecodeTSma(SDecoder *pCoder, STSma *pSma) {
if (tDecodeI8(pCoder, &pSma->version) < 0) return -1;
if (tDecodeI8(pCoder, &pSma->intervalUnit) < 0) return -1;
if (tDecodeI8(pCoder, &pSma->slidingUnit) < 0) return -1;
if (tDecodeI32(pCoder, &pSma->dstVgId) < 0) return -1;
if (tDecodeI8(pCoder, &pSma->timezoneInt) < 0) return -1;
if (tDecodeCStrTo(pCoder, pSma->indexName) < 0) return -1;
if (tDecodeI32(pCoder, &pSma->exprLen) < 0) return -1;
......@@ -3910,7 +3911,7 @@ int tEncodeSVCreateTbReq(SEncoder *pCoder, const SVCreateTbReq *pReq) {
if (pReq->type == TSDB_CHILD_TABLE) {
if (tEncodeI64(pCoder, pReq->ctb.suid) < 0) return -1;
if (tEncodeBinary(pCoder, pReq->ctb.pTag, kvRowLen(pReq->ctb.pTag)) < 0) return -1;
if (tEncodeTag(pCoder, (const STag *)pReq->ctb.pTag) < 0) return -1;
} else if (pReq->type == TSDB_NORMAL_TABLE) {
if (tEncodeSSchemaWrapper(pCoder, &pReq->ntb.schemaRow) < 0) return -1;
} else {
......@@ -3922,8 +3923,6 @@ int tEncodeSVCreateTbReq(SEncoder *pCoder, const SVCreateTbReq *pReq) {
}
int tDecodeSVCreateTbReq(SDecoder *pCoder, SVCreateTbReq *pReq) {
uint32_t len;
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeI32v(pCoder, &pReq->flags) < 0) return -1;
......@@ -3935,7 +3934,7 @@ int tDecodeSVCreateTbReq(SDecoder *pCoder, SVCreateTbReq *pReq) {
if (pReq->type == TSDB_CHILD_TABLE) {
if (tDecodeI64(pCoder, &pReq->ctb.suid) < 0) return -1;
if (tDecodeBinary(pCoder, &pReq->ctb.pTag, &len) < 0) return -1;
if (tDecodeTag(pCoder, (STag **)&pReq->ctb.pTag) < 0) return -1;
} else if (pReq->type == TSDB_NORMAL_TABLE) {
if (tDecodeSSchemaWrapper(pCoder, &pReq->ntb.schemaRow) < 0) return -1;
} else {
......
......@@ -219,9 +219,9 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_VNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_VNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_VNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_TIMEOUT, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_PING, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
......
......@@ -150,20 +150,26 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
pCfg->tsdbCfg.minRows = pCreate->minRows;
pCfg->tsdbCfg.maxRows = pCreate->maxRows;
for (size_t i = 0; i < taosArrayGetSize(pCreate->pRetensions); ++i) {
memcpy(&pCfg->tsdbCfg.retentions[i], taosArrayGet(pCreate->pRetensions, i), sizeof(SRetention));
SRetention *pRetention = &pCfg->tsdbCfg.retentions[i];
memcpy(pRetention, taosArrayGet(pCreate->pRetensions, i), sizeof(SRetention));
if (i == 0) {
if ((pRetention->freq > 0 && pRetention->keep > 0)) pCfg->isRsma = 1;
}
}
pCfg->walCfg.vgId = pCreate->vgId;
pCfg->hashBegin = pCreate->hashBegin;
pCfg->hashEnd = pCreate->hashEnd;
pCfg->hashMethod = pCreate->hashMethod;
pCfg->standby = pCfg->standby;
pCfg->syncCfg.myIndex = pCreate->selfIndex;
pCfg->syncCfg.replicaNum = pCreate->replica;
memset(&pCfg->syncCfg.nodeInfo, 0, sizeof(pCfg->syncCfg.nodeInfo));
for (int i = 0; i < pCreate->replica; ++i) {
pCfg->syncCfg.nodeInfo[i].nodePort = pCreate->replicas[i].port;
snprintf(pCfg->syncCfg.nodeInfo[i].nodeFqdn, sizeof(pCfg->syncCfg.nodeInfo[i].nodeFqdn), "%s",
pCreate->replicas[i].fqdn);
SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
pNode->nodePort = pCreate->replicas[i].port;
tstrncpy(pNode->nodeFqdn, pCreate->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
}
}
......@@ -176,6 +182,8 @@ static void vmGenerateWrapperCfg(SVnodeMgmt *pMgmt, SCreateVnodeReq *pCreate, SW
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SCreateVnodeReq createReq = {0};
SVnodeCfg vnodeCfg = {0};
SWrapperCfg wrapperCfg = {0};
int32_t code = -1;
char path[TSDB_FILENAME_LEN] = {0};
......@@ -184,12 +192,9 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return -1;
}
dDebug("vgId:%d, create vnode req is received, tsma:%d", createReq.vgId, createReq.isTsma);
SVnodeCfg vnodeCfg = {0};
dDebug("vgId:%d, create vnode req is received, tsma:%d standby:%d", createReq.vgId, createReq.isTsma,
createReq.standby);
vmGenerateVnodeCfg(&createReq, &vnodeCfg);
SWrapperCfg wrapperCfg = {0};
vmGenerateWrapperCfg(pMgmt, &createReq, &wrapperCfg);
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, createReq.vgId);
......@@ -218,9 +223,20 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
code = vmOpenVnode(pMgmt, &wrapperCfg, pImpl);
if (code != 0) {
dError("vgId:%d, failed to open vnode since %s", createReq.vgId, terrstr());
code = terrno;
goto _OVER;
}
if (createReq.isTsma) {
SMsgHead *smaMsg = createReq.pTsma;
uint32_t contLen = (uint32_t)(htonl(smaMsg->contLen) - sizeof(SMsgHead));
if (vnodeProcessCreateTSma(pImpl, POINTER_SHIFT(smaMsg, sizeof(SMsgHead)), contLen) < 0) {
dError("vgId:%d, failed to create tsma since %s", createReq.vgId, terrstr());
code = terrno;
goto _OVER;
};
}
code = vnodeStart(pImpl);
if (code != 0) {
dError("vgId:%d, failed to start sync since %s", createReq.vgId, terrstr());
......@@ -228,7 +244,10 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
}
code = vmWriteVnodeListToFile(pMgmt);
if (code != 0) goto _OVER;
if (code != 0) {
code = terrno;
goto _OVER;
}
_OVER:
if (code != 0) {
......@@ -314,8 +333,9 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DISPATCH, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_RECOVER, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_VNODE, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_VNODE, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
......
......@@ -411,18 +411,14 @@ typedef struct {
int64_t dbUid;
int32_t version;
int8_t subType; // column, db or stable
// int8_t withTbName;
// int8_t withSchema;
// int8_t withTag;
SRWLatch lock;
int32_t consumerCnt;
int32_t sqlLen;
int32_t astLen;
char* sql;
char* ast;
char* physicalPlan;
SSchemaWrapper schema;
// int32_t refConsumerCnt;
int64_t stbUid;
} SMqTopicObj;
typedef struct {
......@@ -481,9 +477,7 @@ typedef struct {
int64_t dbUid;
int32_t vgNum;
int8_t subType;
// int8_t withTbName;
// int8_t withSchema;
// int8_t withTag;
int64_t stbUid;
SHashObj* consumerHash; // consumerId -> SMqConsumerEp
SArray* unassignedVgs; // SArray<SMqVgEp*>
} SMqSubscribeObj;
......@@ -535,7 +529,7 @@ typedef struct {
} SMqRebOutputObj;
typedef struct {
char name[TSDB_TOPIC_FNAME_LEN];
char name[TSDB_STREAM_FNAME_LEN];
char sourceDb[TSDB_DB_FNAME_LEN];
char targetDb[TSDB_DB_FNAME_LEN];
char targetSTbName[TSDB_TABLE_FNAME_LEN];
......
......@@ -61,7 +61,7 @@ int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction);
int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction);
void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen);
void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void *param, int32_t paramLen);
void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb);
void mndTransSetDbName(STrans *pTrans, const char *dbname);
void mndTransSetSerial(STrans *pTrans);
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans);
......
......@@ -36,7 +36,7 @@ SArray *mndBuildDnodesArray(SMnode *pMnode);
int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray);
int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray, SVnodeGid *del1, SVnodeGid *del2);
void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen, bool standby);
void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
void *mndBuildAlterVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
......
......@@ -306,6 +306,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
ASSERT(pTopic);
taosRLockLatch(&pTopic->lock);
tstrncpy(topicEp.db, pTopic->db, TSDB_DB_FNAME_LEN);
topicEp.schema.nCols = pTopic->schema.nCols;
if (topicEp.schema.nCols) {
topicEp.schema.pSchema = taosMemoryCalloc(topicEp.schema.nCols, sizeof(SSchema));
......
此差异已折叠。
......@@ -395,10 +395,8 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) {
taosInitRWLatch(&pSubNew->lock);
pSubNew->dbUid = pSub->dbUid;
pSubNew->stbUid = pSub->stbUid;
pSubNew->subType = pSub->subType;
/*pSubNew->withTbName = pSub->withTbName;*/
/*pSubNew->withSchema = pSub->withSchema;*/
/*pSubNew->withTag = pSub->withTag;*/
pSubNew->vgNum = pSub->vgNum;
pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
......@@ -431,9 +429,7 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
tlen += taosEncodeFixedI64(buf, pSub->dbUid);
tlen += taosEncodeFixedI32(buf, pSub->vgNum);
tlen += taosEncodeFixedI8(buf, pSub->subType);
/*tlen += taosEncodeFixedI8(buf, pSub->withTbName);*/
/*tlen += taosEncodeFixedI8(buf, pSub->withSchema);*/
/*tlen += taosEncodeFixedI8(buf, pSub->withTag);*/
tlen += taosEncodeFixedI64(buf, pSub->stbUid);
void *pIter = NULL;
int32_t sz = taosHashGetSize(pSub->consumerHash);
......@@ -458,9 +454,7 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub) {
buf = taosDecodeFixedI64(buf, &pSub->dbUid);
buf = taosDecodeFixedI32(buf, &pSub->vgNum);
buf = taosDecodeFixedI8(buf, &pSub->subType);
/*buf = taosDecodeFixedI8(buf, &pSub->withTbName);*/
/*buf = taosDecodeFixedI8(buf, &pSub->withSchema);*/
/*buf = taosDecodeFixedI8(buf, &pSub->withTag);*/
buf = taosDecodeFixedI64(buf, &pSub->stbUid);
int32_t sz;
buf = taosDecodeFixedI32(buf, &sz);
......
......@@ -613,7 +613,7 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
pMObj = mndAcquireMnode(pMnode, dropReq.dnodeId);
if (pMObj != NULL) {
terrno = TSDB_CODE_MND_MNODE_DEPLOYED;
terrno = TSDB_CODE_MND_MNODE_NOT_EXIST;
goto DROP_DNODE_OVER;
}
......
......@@ -392,11 +392,6 @@ static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq) {
mDebug("mnode:%d, start to create", createReq.dnodeId);
if (sdbGetSize(pMnode->pSdb, SDB_MNODE) >= 3) {
terrno = TSDB_CODE_MND_TOO_MANY_MNODES;
goto _OVER;
}
pObj = mndAcquireMnode(pMnode, createReq.dnodeId);
if (pObj != NULL) {
terrno = TSDB_CODE_MND_MNODE_ALREADY_EXIST;
......@@ -405,12 +400,22 @@ static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq) {
goto _OVER;
}
if (sdbGetSize(pMnode->pSdb, SDB_MNODE) >= 3) {
terrno = TSDB_CODE_MND_TOO_MANY_MNODES;
goto _OVER;
}
pDnode = mndAcquireDnode(pMnode, createReq.dnodeId);
if (pDnode == NULL) {
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
goto _OVER;
}
if (!mndIsDnodeOnline(pMnode, pDnode, taosGetTimestampMs())) {
terrno = TSDB_CODE_NODE_OFFLINE;
goto _OVER;
}
pUser = mndAcquireUser(pMnode, pReq->conn.user);
if (pUser == NULL) {
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
......@@ -632,11 +637,12 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
int32_t numOfRows = 0;
int32_t cols = 0;
SMnodeObj *pObj = NULL;
ESdbStatus objStatus;
char *pWrite;
int64_t curMs = taosGetTimestampMs();
while (numOfRows < rows) {
pShow->pIter = sdbFetch(pSdb, SDB_MNODE, pShow->pIter, (void **)&pObj);
pShow->pIter = sdbFetchAll(pSdb, SDB_MNODE, pShow->pIter, (void **)&pObj, &objStatus);
if (pShow->pIter == NULL) break;
cols = 0;
......@@ -649,23 +655,26 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, b1, false);
bool online = mndIsDnodeOnline(pMnode, pObj->pDnode, curMs);
const char *roles = NULL;
const char *roles = "OFFLINE";
if (pObj->id == pMnode->selfDnodeId) {
roles = syncStr(TAOS_SYNC_STATE_LEADER);
} else {
if (!online) {
roles = "OFFLINE";
} else {
roles = syncStr(pObj->state);
}
if (pObj->pDnode && mndIsDnodeOnline(pMnode, pObj->pDnode, curMs)) {
roles = syncStr(pObj->state);
}
char *b2 = taosMemoryCalloc(1, 12 + VARSTR_HEADER_SIZE);
char b2[12 + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(b2, roles, pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)b2, false);
const char *status = "READY";
if (objStatus == SDB_STATUS_CREATING) status = "CREATING";
if (objStatus == SDB_STATUS_DROPPING) status = "DROPPING";
char b3[9 + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(b3, status, pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)b3, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pObj->createdTime, false);
......
......@@ -426,7 +426,7 @@ static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans,
pVgroup->pTsma = pSmaReq;
int32_t contLen = 0;
void *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
void *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen, false);
taosMemoryFreeClear(pSmaReq);
if (pReq == NULL) return -1;
......@@ -512,7 +512,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to create sma:%s", pTrans->id, pCreate->name);
mndTransSetDbInfo(pTrans, pDb);
mndTransSetDbName(pTrans, pDb->name);
mndTransSetSerial(pTrans);
if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
......@@ -757,7 +757,7 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to drop sma:%s", pTrans->id, pSma->name);
mndTransSetDbInfo(pTrans, pDb);
mndTransSetDbName(pTrans, pDb->name);
if (mndSetDropSmaRedoLogs(pMnode, pTrans, pSma) != 0) goto _OVER;
if (mndSetDropSmaVgroupRedoLogs(pMnode, pTrans, pVgroup) != 0) goto _OVER;
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册