提交 6c9e72d9 编写于 作者: haoranc's avatar haoranc

Merge branch 'main' of https://github.com/taosdata/TDengine into main

......@@ -175,7 +175,7 @@ cd TDengine
```bash
mkdir debug
cd debug
cmake .. -DBUILD_TOOLS=true
cmake .. -DBUILD_TOOLS=true -DBUILD_CONTRIB=true
make
```
......
......@@ -183,7 +183,7 @@ It equals to execute following commands:
```bash
mkdir debug
cd debug
cmake .. -DBUILD_TOOLS=true
cmake .. -DBUILD_TOOLS=true -DBUILD_CONTRIB=true
make
```
......
......@@ -4,5 +4,5 @@ if [ ! -d debug ]; then
mkdir debug || echo -e "failed to make directory for build"
fi
cd debug && cmake .. -DBUILD_TOOLS=true && make
cd debug && cmake .. -DBUILD_TOOLS=true -DBUILD_CONTRIB=true && make
......@@ -2,7 +2,7 @@
IF (DEFINED VERNUMBER)
SET(TD_VER_NUMBER ${VERNUMBER})
ELSE ()
SET(TD_VER_NUMBER "3.1.0.0.alpha")
SET(TD_VER_NUMBER "3.1.0.1.alpha")
ENDIF ()
IF (DEFINED VERCOMPATIBLE)
......
......@@ -32,6 +32,20 @@ docker run -d -p 6030:6030 -p 6041:6041 -p 6043-6049:6043-6049 -p 6043-6049:6043
Note that TDengine Server 3.0 uses TCP port 6030. Port 6041 is used by taosAdapter for the REST API service. Ports 6043 through 6049 are used by taosAdapter for other connectors. You can open these ports as needed.
If you need to persist data to a specific directory on your local machine, please run the following command:
```shell
docker run -d -v ~/data/taos/dnode/data:/var/lib/taos \
-v ~/data/taos/dnode/log:/var/log/taos \
-p 6030:6030 -p 6041:6041 -p 6043-6049:6043-6049 -p 6043-6049:6043-6049/udp tdengine/tdengine
```
:::note
- /var/lib/taos: TDengine's default data file directory. The location can be changed via [configuration file]. Also you can modify ~/data/taos/dnode/data to your any local empty data directory
- /var/log/taos: TDengine's default log file directory. The location can be changed via [configure file]. you can modify ~/data/taos/dnode/log to your any local empty log directory
:::
Run the following command to ensure that your container is running:
```shell
......@@ -113,4 +127,4 @@ In the query above you are selecting the first timestamp (ts) in the interval, a
## Additional Information
For more information about deploying TDengine in a Docker environment, see [Using TDengine in Docker](../../reference/docker).
For more information about deploying TDengine in a Docker environment, see [Deploying TDengine with Docker](../../deployment/docker).
---
title: Deploying TDengine with Docker
sidebar_label: Docker
description: This chapter describes how to start and access TDengine in a Docker container.
---
......@@ -10,8 +11,17 @@ This chapter describes how to start the TDengine service in a container and acce
The TDengine image starts with the HTTP service activated by default, using the following command:
```shell
docker run -d --name tdengine -p 6041:6041 tdengine/tdengine
docker run -d --name tdengine \
-v ~/data/taos/dnode/data:/var/lib/taos \
-v ~/data/taos/dnode/log:/var/log/taos \
-p 6041:6041 tdengine/tdengine
```
:::note
* /var/lib/taos: TDengine's default data file directory. The location can be changed via [configuration file]. And also you can modify ~/data/taos/dnode/data to your any other local emtpy data directory
* /var/log/taos: TDengine's default log file directory. The location can be changed via [configure file]. And also you can modify ~/data/taos/dnode/log to your any other local empty log directory
:::
The above command starts a container named "tdengine" and maps the HTTP service port 6041 to the host port 6041. You can verify that the HTTP service provided in this container is available using the following command.
......@@ -283,39 +293,38 @@ services:
environment:
TAOS_FQDN: "td-1"
TAOS_FIRST_EP: "td-1"
ports:
- 6041:6041
- 6030:6030
volumes:
- taosdata-td1:/var/lib/taos/
- taoslog-td1:/var/log/taos/
# /var/lib/taos: TDengine's default data file directory. The location can be changed via [configuration file]. you can modify ~/data/taos/dnode1/data to your own data directory
- ~/data/taos/dnode1/data:/var/lib/taos
# /var/log/taos: TDengine's default log file directory. The location can be changed via [configure file]. you can modify ~/data/taos/dnode1/log to your own log directory
- ~/data/taos/dnode1/log:/var/log/taos
td-2:
image: tdengine/tdengine:$VERSION
environment:
TAOS_FQDN: "td-2"
TAOS_FIRST_EP: "td-1"
volumes:
- taosdata-td2:/var/lib/taos/
- taoslog-td2:/var/log/taos/
- ~/data/taos/dnode2/data:/var/lib/taos
- ~/data/taos/dnode2/log:/var/log/taos
td-3:
image: tdengine/tdengine:$VERSION
environment:
TAOS_FQDN: "td-3"
TAOS_FIRST_EP: "td-1"
volumes:
- taosdata-td3:/var/lib/taos/
- taoslog-td3:/var/log/taos/
volumes:
taosdata-td1:
taoslog-td1:
taosdata-td2:
taoslog-td2:
taosdata-td3:
taoslog-td3:
- ~/data/taos/dnode3/data:/var/lib/taos
- ~/data/taos/dnode3/log:/var/log/taos
```
:::note
- The `VERSION` environment variable is used to set the tdengine image tag
- `TAOS_FIRST_EP` must be set on the newly created instance so that it can join the TDengine cluster; if there is a high availability requirement, `TAOS_SECOND_EP` needs to be used at the same time
:::
:::
2. Start the cluster
......@@ -382,24 +391,22 @@ networks:
services:
td-1:
image: tdengine/tdengine:$VERSION
networks:
- inter
environment:
TAOS_FQDN: "td-1"
TAOS_FIRST_EP: "td-1"
volumes:
- taosdata-td1:/var/lib/taos/
- taoslog-td1:/var/log/taos/
# /var/lib/taos: TDengine's default data file directory. The location can be changed via [configuration file]. you can modify ~/data/taos/dnode1/data to your own data directory
- ~/data/taos/dnode1/data:/var/lib/taos
# /var/log/taos: TDengine's default log file directory. The location can be changed via [configure file]. you can modify ~/data/taos/dnode1/log to your own log directory
- ~/data/taos/dnode1/log:/var/log/taos
td-2:
image: tdengine/tdengine:$VERSION
networks:
- inter
environment:
TAOS_FQDN: "td-2"
TAOS_FIRST_EP: "td-1"
volumes:
- taosdata-td2:/var/lib/taos/
- taoslog-td2:/var/log/taos/
- ~/data/taos/dnode2/data:/var/lib/taos
- ~/data/taos/dnode2/log:/var/log/taos
adapter:
image: tdengine/tdengine:$VERSION
entrypoint: "taosadapter"
......@@ -431,11 +438,6 @@ services:
>> /etc/nginx/nginx.conf;cat /etc/nginx/nginx.conf;
nginx -g 'daemon off;'",
]
volumes:
taosdata-td1:
taoslog-td1:
taosdata-td2:
taoslog-td2:
```
## Deploy with docker swarm
......
......@@ -5,7 +5,7 @@ description: This document describes how to deploy a TDengine cluster on a serve
TDengine has a native distributed design and provides the ability to scale out. A few nodes can form a TDengine cluster. If you need higher processing power, you just need to add more nodes into the cluster. TDengine uses virtual node technology to virtualize a node into multiple virtual nodes to achieve load balancing. At the same time, TDengine can group virtual nodes on different nodes into virtual node groups, and use the replication mechanism to ensure the high availability of the system. The cluster feature of TDengine is completely open source.
This document describes how to manually deploy a cluster on a host as well as how to deploy on Kubernetes and by using Helm.
This document describes how to manually deploy a cluster on a host directly and deploy a cluster with Docker, Kubernetes or Helm.
```mdx-code-block
import DocCardList from '@theme/DocCardList';
......
......@@ -42,7 +42,7 @@ In TDengine, the data types below can be used when specifying a column or tag.
| 14 | NCHAR | User Defined | Multi-byte string that can include multi byte characters like Chinese characters. Each character of NCHAR type consumes 4 bytes storage. The string value should be quoted with single quotes. Literal single quote inside the string must be preceded with backslash, like `\'`. The length must be specified when defining a column or tag of NCHAR type, for example nchar(10) means it can store at most 10 characters of nchar type and will consume fixed storage of 40 bytes. An error will be reported if the string value exceeds the length defined. |
| 15 | JSON | | JSON type can only be used on tags. A tag of json type is excluded with any other tags of any other type. |
| 16 | VARCHAR | User-defined | Alias of BINARY |
| 16 | GEOMETRY | User-defined | Geometry |
| 17 | GEOMETRY | User-defined | Geometry |
:::note
- Each row of the table cannot be longer than 48KB (64KB since version 3.0.5.0) (note that each BINARY/NCHAR/GEOMETRY column takes up an additional 2 bytes of storage space).
......
......@@ -1274,3 +1274,161 @@ SELECT SERVER_STATUS();
```
**Description**: The server status.
## Geometry Functions
### Geometry Input Functions
Geometry input functions create geometry data from WTK.
#### ST_GeomFromText
```sql
ST_GeomFromText(VARCHAR WKT expr)
```
**Description**: Return a specified GEOMETRY value from Well-Known Text representation (WKT).
**Return value type**: GEOMETRY
**Applicable data types**: VARCHAR
**Applicable table types**: standard tables and supertables
**Explanations**:
- The input can be one of WTK string, like POINT, LINESTRING, POLYGON, MULTIPOINT, MULTILINESTRING, MULTIPOLYGON, GEOMETRYCOLLECTION.
- The output is a GEOMETRY data type, internal defined as binary string.
### Geometry Output Functions
Geometry output functions convert geometry data into WTK.
#### ST_AsText
```sql
ST_AsText(GEOMETRY geom)
```
**Description**: Return a specified Well-Known Text representation (WKT) value from GEOMETRY data.
**Return value type**: VARCHAR
**Applicable data types**: GEOMETRY
**Applicable table types**: standard tables and supertables
**Explanations**:
- The output can be one of WTK string, like POINT, LINESTRING, POLYGON, MULTIPOINT, MULTILINESTRING, MULTIPOLYGON, GEOMETRYCOLLECTION.
### Geometry Relationships Functions
Geometry relationships functions determine spatial relationships between geometries.
#### ST_Intersects
```sql
ST_Intersects(GEOMETRY geomA, GEOMETRY geomB)
```
**Description**: Compares two geometries and returns true if they intersect.
**Return value type**: BOOL
**Applicable data types**: GEOMETRY, GEOMETRY
**Applicable table types**: standard tables and supertables
**Explanations**:
- Geometries intersect if they have any point in common.
#### ST_Equals
```sql
ST_Equals(GEOMETRY geomA, GEOMETRY geomB)
```
**Description**: Returns TRUE if the given geometries are "spatially equal".
**Return value type**: BOOL
**Applicable data types**: GEOMETRY, GEOMETRY
**Applicable table types**: standard tables and supertables
**Explanations**:
- 'Spatially equal' means ST_Contains(A,B) = true and ST_Contains(B,A) = true, and the ordering of points can be different but represent the same geometry structure.
#### ST_Touches
```sql
ST_Touches(GEOMETRY geomA, GEOMETRY geomB)
```
**Description**: Returns TRUE if A and B intersect, but their interiors do not intersect.
**Return value type**: BOOL
**Applicable data types**: GEOMETRY, GEOMETRY
**Applicable table types**: standard tables and supertables
**Explanations**:
- A and B have at least one point in common, and the common points lie in at least one boundary.
- For Point/Point inputs the relationship is always FALSE, since points do not have a boundary.
#### ST_Covers
```sql
ST_Covers(GEOMETRY geomA, GEOMETRY geomB)
```
**Description**: Returns TRUE if every point in Geometry B lies inside (intersects the interior or boundary of) Geometry A.
**Return value type**: BOOL
**Applicable data types**: GEOMETRY, GEOMETRY
**Applicable table types**: standard tables and supertables
**Explanations**:
- A covers B means no point of B lies outside (in the exterior of) A.
#### ST_Contains
```sql
ST_Contains(GEOMETRY geomA, GEOMETRY geomB)
```
**Description**: Returns TRUE if geometry A contains geometry B.
**Return value type**: BOOL
**Applicable data types**: GEOMETRY, GEOMETRY
**Applicable table types**: standard tables and supertables
**Explanations**:
- A contains B if and only if all points of B lie inside (i.e. in the interior or boundary of) A (or equivalently, no points of B lie in the exterior of A), and the interiors of A and B have at least one point in common.
#### ST_ContainsProperly
```sql
ST_ContainsProperly(GEOMETRY geomA, GEOMETRY geomB)
```
**Description**: Returns TRUE if every point of B lies inside A.
**Return value type**: BOOL
**Applicable data types**: GEOMETRY, GEOMETRY
**Applicable table types**: standard tables and supertables
**Explanations**
- There is no point of B that lies on the boundary of A or in the exterior of A.
......@@ -54,7 +54,7 @@ LIKE is used together with wildcards to match strings. Its usage is described as
MATCH and NMATCH are used together with regular expressions to match strings. Their usage is described as follows:
- Use POSIX regular expression syntax. For more information, see Regular Expressions.
- Regular expression can be used against only table names, i.e. `tbname`, and tags of binary/nchar types, but can't be used against data columns.
- Regular expression can be used against only table names, i.e. `tbname`, and tags/columns of binary/nchar types.
- The maximum length of regular expression string is 128 bytes. Configuration parameter `maxRegexStringLen` can be used to set the maximum allowed regular expression. It's a configuration parameter on the client side, and will take effect after restarting the client.
## Logical Operators
......
......@@ -24,8 +24,9 @@ install.packages("RJDBC", repos='http://cran.us.r-project.org')
```
:::note
On Linux systems, installing the RJDBC package may require installing the necessary components for compilation. For example, on Ubuntu, you can execute the command ``apt install -y libbz2-dev libpcre2-dev libicu-dev`` to install the required components.
On Windows systems, you need to set the **JAVA_HOME** environment variable.
1. The default R language package version 4.2 which shipped with Ubuntu might lead unresponsive bug. Please install latest version of R language package from the [official website](https://www.r-project.org/).
2. On Linux systems, installing the RJDBC package may require installing the necessary components for compilation. For example, on Ubuntu, you can execute the command ``apt install -y libbz2-dev libpcre2-dev libicu-dev`` to install the required components.
3. On Windows systems, you need to set the **JAVA_HOME** environment variable.
:::
3. Download the TDengine JDBC driver: Visit the Maven website and download the TDengine JDBC driver (taos-jdbcdriver-X.X.X-dist.jar) to your local machine.
......
......@@ -7,10 +7,10 @@ description: This document describes the supported platforms for the TDengine se
| | **Windows Server 2016/2019** | **Windows 10/11** | **CentOS 7.9/8** | **Ubuntu 18 or later** | **macOS** |
| ------------ | ---------------------------- | ----------------- | ---------------- | ---------------- | --------- |
| X64 | ● | ● | ● | ● | ● |
| X64 | ●/E | ●/E | ● | ● | ● |
| ARM64 | | | ● | | ● |
Note: ● means officially tested and verified, ○ means unofficially tested and verified.
Note: 1) ● means officially tested and verified, ○ means unofficially tested and verified, E means only supported by the enterprise edition. 2) The community edition only supports newer versions of mainstream operating systems, including Ubuntu 18+/CentOS 7+/RetHat/Debian/CoreOS/FreeBSD/OpenSUSE/SUSE Linux/Fedora/macOS, etc. If you have requirements for other operating systems and editions, please contact support of the enterprise edition.
## List of supported platforms for TDengine clients and connectors
......
label: TDengine Docker images
\ No newline at end of file
......@@ -10,6 +10,10 @@ For TDengine 2.x installation packages by version, please visit [here](https://t
import Release from "/components/ReleaseV3";
## 3.1.0.0
<Release type="tdengine" version="3.1.0.0" />
## 3.0.7.1
<Release type="tdengine" version="3.0.7.1" />
......
......@@ -12,5 +12,9 @@ driver_path = args[1] # path to jdbc-driver for example: "/root/taos-jdbcdriver-
driver = JDBC("com.taosdata.jdbc.rs.RestfulDriver", driver_path)
conn = dbConnect(driver, "jdbc:TAOS-RS://localhost:6041?user=root&password=taosdata")
dbGetQuery(conn, "SELECT server_version()")
dbSendUpdate(conn, "create database if not exists rtest")
dbSendUpdate(conn, "create table if not exists rtest.test (ts timestamp, current float, voltage int, devname varchar(20))")
dbSendUpdate(conn, "insert into rtest.test values (now, 1.2, 220, 'test')")
dbGetQuery(conn, "select * from rtest.test")
dbDisconnect(conn)
# ANCHOR_END: demo
......@@ -28,6 +28,21 @@ docker run -d -p 6030:6030 -p 6041:6041 -p 6043-6049:6043-6049 -p 6043-6049:6043
注意:TDengine 3.0 服务端仅使用 6030 TCP 端口。6041 为 taosAdapter 所使用提供 REST 服务端口。6043-6049 为 taosAdapter 提供第三方应用接入所使用端口,可根据需要选择是否打开。
如果需要将数据持久化到本机的某一个文件夹,则执行下边的命令:
```shell
docker run -d -v ~/data/taos/dnode/data:/var/lib/taos \
-v ~/data/taos/dnode/log:/var/log/taos \
-p 6030:6030 -p 6041:6041 -p 6043-6049:6043-6049 -p 6043-6049:6043-6049/udp tdengine/tdengine
```
:::note
- /var/lib/taos: TDengine 默认数据文件目录。可通过[配置文件]修改位置。你可以修改~/data/taos/dnode/data为你自己的数据目录
- /var/log/taos: TDengine 默认日志文件目录。可通过[配置文件]修改位置。你可以修改~/data/taos/dnode/log为你自己的日志目录
:::
确定该容器已经启动并且在正常运行。
```shell
......@@ -108,4 +123,4 @@ SELECT FIRST(ts), AVG(current), MAX(voltage), MIN(phase) FROM test.d10 INTERVAL(
## 其它
更多关于在 Docker 环境下使用 TDengine 的细节,请参考 [在 Docker 下使用 TDengine](../../reference/docker)
更多关于在 Docker 环境下使用 TDengine 的细节,请参考 [用 Docker 部署 TDengine](../../deployment/docker)
......@@ -24,8 +24,9 @@ install.packages("RJDBC", repos='http://cran.us.r-project.org')
```
:::note
在 Linux 上安装 RJDBC 包可能需要安装编译需要的组件,以 Ubuntu 为例执行 `apt install -y libbz2-dev libpcre2-dev libicu-dev` 命令安装。
在 Windows 系统上需要设置 JAVA_HOME 环境变量。
1. Ubuntu 系统自带的 R 语言软件版本 4.2 在调用 RJDBC 库会产生无响应 bug,请安装 R 语言[官网](https://www.r-project.org/)的安装包。
2. 在 Linux 上安装 RJDBC 包可能需要安装编译需要的组件,以 Ubuntu 为例执行 `apt install -y libbz2-dev libpcre2-dev libicu-dev` 命令安装。
3. 在 Windows 系统上需要设置 JAVA_HOME 环境变量。
:::
3. 下载 TDengine JDBC 驱动程序:访问 maven.org 网站,下载 TDengine JDBC 驱动程序(taos-jdbcdriver-X.X.X-dist.jar)。
......
---
title: 用 Docker 部署 TDengine
sidebar_label: Docker
description: '本章主要介绍如何在容器中启动 TDengine 服务并访问它'
---
......@@ -10,8 +11,17 @@ description: '本章主要介绍如何在容器中启动 TDengine 服务并访
TDengine 镜像启动时默认激活 HTTP 服务,使用下列命令
```shell
docker run -d --name tdengine -p 6041:6041 tdengine/tdengine
docker run -d --name tdengine \
-v ~/data/taos/dnode/data:/var/lib/taos \
-v ~/data/taos/dnode/log:/var/log/taos \
-p 6041:6041 tdengine/tdengine
```
:::note
- /var/lib/taos: TDengine 默认数据文件目录。可通过[配置文件]修改位置。你可以修改~/data/taos/dnode/data为你自己的数据目录
- /var/log/taos: TDengine 默认日志文件目录。可通过[配置文件]修改位置。你可以修改~/data/taos/dnode/log为你自己的日志目录
:::
以上命令启动了一个名为“tdengine”的容器,并把其中的 HTTP 服务的端 6041 映射到了主机端口 6041。使用如下命令可以验证该容器中提供的 HTTP 服务是否可用:
......@@ -291,38 +301,37 @@ services:
environment:
TAOS_FQDN: "td-1"
TAOS_FIRST_EP: "td-1"
ports:
- 6041:6041
- 6030:6030
volumes:
- taosdata-td1:/var/lib/taos/
- taoslog-td1:/var/log/taos/
# /var/lib/taos: TDengine 默认数据文件目录。可通过[配置文件]修改位置。你可以修改~/data/taos/dnode1/data为你自己的数据目录
- ~/data/taos/dnode1/data:/var/lib/taos
# /var/log/taos: TDengine 默认日志文件目录。可通过[配置文件]修改位置。你可以修改~/data/taos/dnode1/log为你自己的日志目录
- ~/data/taos/dnode1/log:/var/log/taos
td-2:
image: tdengine/tdengine:$VERSION
environment:
TAOS_FQDN: "td-2"
TAOS_FIRST_EP: "td-1"
volumes:
- taosdata-td2:/var/lib/taos/
- taoslog-td2:/var/log/taos/
- ~/data/taos/dnode2/data:/var/lib/taos
- ~/data/taos/dnode2/log:/var/log/taos
td-3:
image: tdengine/tdengine:$VERSION
environment:
TAOS_FQDN: "td-3"
TAOS_FIRST_EP: "td-1"
volumes:
- taosdata-td3:/var/lib/taos/
- taoslog-td3:/var/log/taos/
volumes:
taosdata-td1:
taoslog-td1:
taosdata-td2:
taoslog-td2:
taosdata-td3:
taoslog-td3:
- ~/data/taos/dnode3/data:/var/lib/taos
- ~/data/taos/dnode3/log:/var/log/taos
```
:::note
* `VERSION` 环境变量被用来设置 tdengine image tag
* 在新创建的实例上必须设置 `TAOS_FIRST_EP` 以使其能够加入 TDengine 集群;如果有高可用需求,则需要同时使用 `TAOS_SECOND_EP`
:::
2. 启动集群
......@@ -397,24 +406,22 @@ networks:
services:
td-1:
image: tdengine/tdengine:$VERSION
networks:
- inter
environment:
TAOS_FQDN: "td-1"
TAOS_FIRST_EP: "td-1"
volumes:
- taosdata-td1:/var/lib/taos/
- taoslog-td1:/var/log/taos/
# /var/lib/taos: TDengine 默认数据文件目录。可通过[配置文件]修改位置。你可以修改~/data/taos/dnode1/data为你自己的数据目录
- ~/data/taos/dnode1/data:/var/lib/taos
# /var/log/taos: TDengine 默认日志文件目录。可通过[配置文件]修改位置。你可以修改~/data/taos/dnode1/log为你自己的日志目录
- ~/data/taos/dnode1/log:/var/log/taos
td-2:
image: tdengine/tdengine:$VERSION
networks:
- inter
environment:
TAOS_FQDN: "td-2"
TAOS_FIRST_EP: "td-1"
volumes:
- taosdata-td2:/var/lib/taos/
- taoslog-td2:/var/log/taos/
- ~/data/taos/dnode2/data:/var/lib/taos
- ~/data/taos/dnode2/log:/var/log/taos
adapter:
image: tdengine/tdengine:$VERSION
entrypoint: "taosadapter"
......@@ -446,11 +453,6 @@ services:
>> /etc/nginx/nginx.conf;cat /etc/nginx/nginx.conf;
nginx -g 'daemon off;'",
]
volumes:
taosdata-td1:
taoslog-td1:
taosdata-td2:
taoslog-td2:
```
## 使用 docker swarm 部署
......
......@@ -6,7 +6,7 @@ description: 部署 TDengine 集群的多种方式
TDengine 支持集群,提供水平扩展的能力。如果需要获得更高的处理能力,只需要多增加节点即可。TDengine 采用虚拟节点技术,将一个节点虚拟化为多个虚拟节点,以实现负载均衡。同时,TDengine可以将多个节点上的虚拟节点组成虚拟节点组,通过多副本机制,以保证供系统的高可用。TDengine的集群功能完全开源。
本章节主要介绍如何在主机上人工部署集群,以及如何使用 Kubernetes 和 Helm部署集群。
本章节主要介绍如何在主机上人工部署集群,docker部署,以及如何使用 Kubernetes 和 Helm部署集群。
```mdx-code-block
import DocCardList from '@theme/DocCardList';
......
......@@ -315,7 +315,7 @@ WHERE (column|tbname) match/MATCH/nmatch/NMATCH _regex_
### 使用限制
只能针对表名(即 tbname 筛选)、binary/nchar 类型标签值进行正则表达式过滤,不支持普通列的过滤。
只能针对表名(即 tbname 筛选)、binary/nchar 类型值进行正则表达式过滤。
正则匹配字符串长度不能超过 128 字节。可以通过参数 _maxRegexStringLen_ 设置和调整最大允许的正则匹配字符串,该参数是客户端配置参数,需要重启才能生效。
......
......@@ -1265,3 +1265,140 @@ SELECT SERVER_STATUS();
```
**说明**:检测服务端是否所有 dnode 都在线,如果是则返回成功,否则返回无法建立连接的错误。
## Geometry 函数
### Geometry 输入函数:
#### ST_GeomFromText
```sql
ST_GeomFromText(VARCHAR WKT expr)
```
**功能说明**:根据 Well-Known Text (WKT) 表示从指定的几何值创建几何数据。
**返回值类型**:GEOMETRY
**适用数据类型**:VARCHAR
**适用表类型**:标准表和超表
**使用说明**:输入可以是 WKT 字符串之一,例如点(POINT)、线串(LINESTRING)、多边形(POLYGON)、多点集(MULTIPOINT)、多线串(MULTILINESTRING)、多多边形(MULTIPOLYGON)、几何集合(GEOMETRYCOLLECTION)。输出是以二进制字符串形式定义的 GEOMETRY 数据类型。
### Geometry 输出函数:
#### ST_AsText
```sql
ST_AsText(GEOMETRY geom)
```
**功能说明**:从几何数据中返回指定的 Well-Known Text (WKT) 表示。
**返回值类型**:VARCHAR
**适用数据类型**:GEOMETRY
**适用表类型**:标准表和超表
**使用说明**:输出可以是 WKT 字符串之一,例如点(POINT)、线串(LINESTRING)、多边形(POLYGON)、多点集(MULTIPOINT)、多线串(MULTILINESTRING)、多多边形(MULTIPOLYGON)、几何集合(GEOMETRYCOLLECTION)。
### Geometry 关系函数:
#### ST_Intersects
```sql
ST_Intersects(GEOMETRY geomA, GEOMETRY geomB)
```
##功能说明**:比较两个几何对象,并在它们相交时返回 true。
**返回值类型**:BOOL
**适用数据类型**:GEOMETRY,GEOMETRY
**适用表类型**:标准表和超表
**使用说明**:如果两个几何对象有任何一个共享点,则它们相交。
#### ST_Equals
```sql
ST_Equals(GEOMETRY geomA, GEOMETRY geomB)
```
**功能说明**:如果给定的几何对象是"空间相等"的,则返回 TRUE。
**返回值类型**:BOOL
**适用数据类型**:GEOMETRY,GEOMETRY
**适用表类型**:标准表和超表
**使用说明**:"空间相等"意味着 ST_Contains(A,B) = true 和 ST_Contains(B,A) = true,并且点的顺序可能不同,但表示相同的几何结构。
#### ST_Touches
```sql
ST_Touches(GEOMETRY geomA, GEOMETRY geomB)
```
**功能说明**:如果 A 和 B 相交,但它们的内部不相交,则返回 TRUE。
**返回值类型**:BOOL
**适用数据类型**:GEOMETRY,GEOMETRY
**适用表类型**:标准表和超表
**使用说明**:A 和 B 至少有一个公共点,并且这些公共点位于至少一个边界中。对于点/点输入,关系始终为 FALSE,因为点没有边界。
#### ST_Covers
```sql
ST_Covers(GEOMETRY geomA, GEOMETRY geomB)
```
**功能说明**:如果 B 中的每个点都位于几何形状 A 内部(与内部或边界相交),则返回 TRUE。
**返回值类型**:BOOL
**适用数据类型**:GEOMETRY,GEOMETRY
**适用表类型**:标准表和超表
**使用说明**:A 包含 B 意味着 B 中的没有点位于 A 的外部(在外部)。
#### ST_Contains
```sql
ST_Contains(GEOMETRY geomA, GEOMETRY geomB)
```
**功能说明**:如果 A 包含 B,描述:如果几何形状 A 包含几何形状 B,则返回 TRUE。
**返回值类型**:BOOL
**适用数据类型**:GEOMETRY,GEOMETRY
**适用表类型**:标准表和超表
**使用说明**:A 包含 B 当且仅当 B 的所有点位于 A 的内部(即位于内部或边界上)(或等效地,B 的没有点位于 A 的外部),并且 A 和 B 的内部至少有一个公共点。
#### ST_ContainsProperly
```sql
ST_ContainsProperly(GEOMETRY geomA, GEOMETRY geomB)
```
**功能说明**:如果 B 的每个点都位于 A 内部,则返回 TRUE。
**返回值类型**:BOOL
**适用数据类型**:GEOMETRY,GEOMETRY
**适用表类型**:标准表和超表
**使用说明**:B 的没有点位于 A 的边界或外部。
......@@ -7,12 +7,13 @@ description: "TDengine 服务端、客户端和连接器支持的平台列表"
| | **Windows server 2016/2019** | **Windows 10/11** | **CentOS 7.9/8** | **Ubuntu 18 以上** | **统信 UOS** | **银河/中标麒麟** | **凝思 V60/V80** | **macOS** |
| ------------ | ---------------------------- | ----------------- | ---------------- | ---------------- | ------------ | ----------------- | ---------------- | --------- |
| X64 | ● | ● | ● | ● | ● | ● | ● | ● |
| 树莓派 ARM64 | | | ● | | | | | |
| 华为云 ARM64 | | | | ● | | | | |
| M1 | | | | | | | | ● |
| X64 | ●/E | ●/E | ● | ● | ●/E | ●/E | ●/E | ● |
| 树莓派 ARM64 | | | ● | | | | | |
| 华为云 ARM64 | | | | ● | | | | |
| M1 | | | | | | | | ● |
注: ● 表示经过官方测试验证, ○ 表示非官方测试验证。
注:1) ● 表示经过官方测试验证, ○ 表示非官方测试验证,E 表示仅企业版支持。
2) 社区版仅支持主流操作系统的较新版本,包括 Ubuntu 18+/CentOS 7+/RetHat/Debian/CoreOS/FreeBSD/OpenSUSE/SUSE Linux/Fedora/macOS 等。如果有其他操作系统及版本的需求,请联系企业版支持。
## TDengine 客户端和连接器支持的平台列表
......
label: TDengine Docker 镜像
\ No newline at end of file
......@@ -10,6 +10,10 @@ TDengine 2.x 各版本安装包请访问[这里](https://www.taosdata.com/all-do
import Release from "/components/ReleaseV3";
## 3.1.0.0
<Release type="tdengine" version="3.1.0.0" />
## 3.0.7.1
<Release type="tdengine" version="3.0.7.1" />
......
......@@ -20,18 +20,12 @@ mvn clean compile exec:java -Dexec.mainClass="com.taosdata.example.JdbcDemo" -De
```
## Compile the Demo Code and Run It
To compile taos-jdbcdriver, go to the source directory ``TDengine/src/connector/jdbc`` and execute
```
mvn clean package -Dmaven.test.skip=true
```
To compile the demo project, go to the source directory ``TDengine/tests/examples/JDBC/JDBCDemo`` and execute
To run JDBCDemo.jar, execute
```
mvn clean package assembly:single
```
To run JDBCDemo.jar, go to ``TDengine/tests/examples/JDBC/JDBCDemo`` and execute
```
java -Djava.ext.dirs=../../../../src/connector/jdbc/target:$JAVA_HOME/jre/lib/ext -jar target/JDBCDemo-SNAPSHOT-jar-with-dependencies.jar -host [HOSTNAME]
java -jar target/JDBCDemo-SNAPSHOT-jar-with-dependencies.jar -host [HOSTNAME]
```
......@@ -16,8 +16,6 @@ public class JdbcRestfulDemo {
Properties properties = new Properties();
properties.setProperty("charset", "UTF-8");
properties.setProperty("locale", "en_US.UTF-8");
properties.setProperty("timezone", "UTC-8");
Connection conn = DriverManager.getConnection(url, properties);
Statement stmt = conn.createStatement();
......
......@@ -30,6 +30,7 @@ extern "C" {
typedef struct SStreamTask SStreamTask;
#define SSTREAM_TASK_VER 1
enum {
STREAM_STATUS__NORMAL = 0,
STREAM_STATUS__STOP,
......@@ -266,13 +267,13 @@ typedef struct SCheckpointInfo {
} SCheckpointInfo;
typedef struct SStreamStatus {
int8_t taskStatus;
int8_t downstreamReady; // downstream tasks are all ready now, if this flag is set
int8_t schedStatus;
int8_t keepTaskStatus;
bool transferState;
int8_t timerActive; // timer is active
int8_t pauseAllowed; // allowed task status to be set to be paused
int8_t taskStatus;
int8_t downstreamReady; // downstream tasks are all ready now, if this flag is set
int8_t schedStatus;
int8_t keepTaskStatus;
bool transferState;
int8_t timerActive; // timer is active
int8_t pauseAllowed; // allowed task status to be set to be paused
} SStreamStatus;
typedef struct SHistDataRange {
......@@ -309,6 +310,7 @@ typedef struct {
} STaskTimestamp;
struct SStreamTask {
int64_t ver;
SStreamId id;
SSTaskBasicInfo info;
STaskOutputInfo outputInfo;
......@@ -589,10 +591,10 @@ bool streamTaskShouldPause(const SStreamStatus* pStatus);
bool streamTaskIsIdle(const SStreamTask* pTask);
int32_t streamTaskEndScanWAL(SStreamTask* pTask);
SStreamChildEpInfo * streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId);
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize);
SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId);
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize);
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
// recover and fill history
void streamTaskCheckDownstreamTasks(SStreamTask* pTask);
......@@ -628,7 +630,8 @@ int32_t streamDispatchTransferStateMsg(SStreamTask* pTask);
// agg level
int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask);
int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq *pReq, SRpcHandleInfo* pRpcInfo);
int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq,
SRpcHandleInfo* pRpcInfo);
int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask);
// stream task meta
......@@ -642,7 +645,7 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded);
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId);
int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); // todo remove it
int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); // todo remove it
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
......@@ -659,7 +662,6 @@ int32_t streamTaskReleaseState(SStreamTask* pTask);
int32_t streamTaskReloadState(SStreamTask* pTask);
int32_t streamAlignTransferState(SStreamTask* pTask);
#ifdef __cplusplus
}
#endif
......
......@@ -912,10 +912,10 @@ function updateProduct() {
else
echo -e "${GREEN_DARK}To start ${productName2} ${NC}: ./${serverName2}${NC}"
[ -f ${installDir}/bin/${clientName2}adapter ] && \
echo -e "${GREEN_DARK}To start ${clientName2}Adapter ${NC}: ${clientName2}adapter &${NC}"
echo -e "${GREEN_DARK}To start ${clientName2}Adapter ${NC}: ${clientName2}adapter ${NC}"
fi
echo -e "${GREEN_DARK}To enable ${clientName2}keeper ${NC}: sudo systemctl enable ${clientName2}keeper &${NC}"
echo -e "${GREEN_DARK}To enable ${clientName2}keeper ${NC}: sudo systemctl enable ${clientName2}keeper ${NC}"
if [ ${openresty_work} = 'true' ]; then
echo -e "${GREEN_DARK}To access ${productName2} ${NC}: use ${GREEN_UNDERLINE}${clientName2} -h $serverFqdn${NC} in shell OR from ${GREEN_UNDERLINE}http://127.0.0.1:${web_port}${NC}"
......@@ -996,10 +996,10 @@ function installProduct() {
else
echo -e "${GREEN_DARK}To start ${productName2} ${NC}: ${serverName2}${NC}"
[ -f ${installDir}/bin/${clientName2}adapter ] && \
echo -e "${GREEN_DARK}To start ${clientName2}Adapter ${NC}: ${clientName2}adapter &${NC}"
echo -e "${GREEN_DARK}To start ${clientName2}Adapter ${NC}: ${clientName2}adapter ${NC}"
fi
echo -e "${GREEN_DARK}To enable ${clientName2}keeper ${NC}: sudo systemctl enable ${clientName2}keeper &${NC}"
echo -e "${GREEN_DARK}To enable ${clientName2}keeper ${NC}: sudo systemctl enable ${clientName2}keeper ${NC}"
if [ ! -z "$firstEp" ]; then
tmpFqdn=${firstEp%%:*}
......
......@@ -70,6 +70,7 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
if (tEncodeI32(pEncoder, innerSz) < 0) return -1;
for (int32_t j = 0; j < innerSz; j++) {
SStreamTask *pTask = taosArrayGetP(pArray, j);
pTask->ver = SSTREAM_TASK_VER;
if (tEncodeStreamTask(pEncoder, pTask) < 0) return -1;
}
}
......@@ -154,7 +155,7 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) {
return 0;
}
static void* freeStreamTasks(SArray* pTaskLevel) {
static void *freeStreamTasks(SArray *pTaskLevel) {
int32_t numOfLevel = taosArrayGetSize(pTaskLevel);
for (int32_t i = 0; i < numOfLevel; i++) {
SArray *pLevel = taosArrayGetP(pTaskLevel, i);
......@@ -192,14 +193,14 @@ SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) {
SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp));
if (pVgEpNew == NULL) return NULL;
pVgEpNew->vgId = pVgEp->vgId;
// pVgEpNew->qmsg = taosStrdup(pVgEp->qmsg);
// pVgEpNew->qmsg = taosStrdup(pVgEp->qmsg);
pVgEpNew->epSet = pVgEp->epSet;
return pVgEpNew;
}
void tDeleteSMqVgEp(SMqVgEp *pVgEp) {
if (pVgEp) {
// taosMemoryFreeClear(pVgEp->qmsg);
// taosMemoryFreeClear(pVgEp->qmsg);
taosMemoryFree(pVgEp);
}
}
......@@ -207,14 +208,14 @@ void tDeleteSMqVgEp(SMqVgEp *pVgEp) {
int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) {
int32_t tlen = 0;
tlen += taosEncodeFixedI32(buf, pVgEp->vgId);
// tlen += taosEncodeString(buf, pVgEp->qmsg);
// tlen += taosEncodeString(buf, pVgEp->qmsg);
tlen += taosEncodeSEpSet(buf, &pVgEp->epSet);
return tlen;
}
void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp, int8_t sver) {
buf = taosDecodeFixedI32(buf, &pVgEp->vgId);
if(sver == 1){
if (sver == 1) {
uint64_t size = 0;
buf = taosDecodeVariantU64(buf, &size);
buf = POINTER_SHIFT(buf, size);
......@@ -223,7 +224,7 @@ void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp, int8_t sver) {
return (void *)buf;
}
SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char* cgroup) {
SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char *cgroup) {
SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj));
if (pConsumer == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......@@ -260,12 +261,12 @@ SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char* cgroup) {
}
void tDeleteSMqConsumerObj(SMqConsumerObj *pConsumer, bool delete) {
if(pConsumer == NULL) return;
if (pConsumer == NULL) return;
taosArrayDestroyP(pConsumer->currentTopics, (FDelete)taosMemoryFree);
taosArrayDestroyP(pConsumer->rebNewTopics, (FDelete)taosMemoryFree);
taosArrayDestroyP(pConsumer->rebRemovedTopics, (FDelete)taosMemoryFree);
taosArrayDestroyP(pConsumer->assignedTopics, (FDelete)taosMemoryFree);
if(delete){
if (delete) {
taosMemoryFree(pConsumer);
}
}
......@@ -392,7 +393,7 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t s
taosArrayPush(pConsumer->assignedTopics, &topic);
}
if(sver > 1){
if (sver > 1) {
buf = taosDecodeFixedI8(buf, &pConsumer->withTbName);
buf = taosDecodeFixedI8(buf, &pConsumer->autoCommit);
buf = taosDecodeFixedI32(buf, &pConsumer->autoCommitInterval);
......@@ -401,18 +402,18 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t s
return (void *)buf;
}
//SMqConsumerEp *tCloneSMqConsumerEp(const SMqConsumerEp *pConsumerEpOld) {
// SMqConsumerEp *pConsumerEpNew = taosMemoryMalloc(sizeof(SMqConsumerEp));
// if (pConsumerEpNew == NULL) return NULL;
// pConsumerEpNew->consumerId = pConsumerEpOld->consumerId;
// pConsumerEpNew->vgs = taosArrayDup(pConsumerEpOld->vgs, NULL);
// return pConsumerEpNew;
//}
// SMqConsumerEp *tCloneSMqConsumerEp(const SMqConsumerEp *pConsumerEpOld) {
// SMqConsumerEp *pConsumerEpNew = taosMemoryMalloc(sizeof(SMqConsumerEp));
// if (pConsumerEpNew == NULL) return NULL;
// pConsumerEpNew->consumerId = pConsumerEpOld->consumerId;
// pConsumerEpNew->vgs = taosArrayDup(pConsumerEpOld->vgs, NULL);
// return pConsumerEpNew;
// }
//
//void tDeleteSMqConsumerEp(void *data) {
// SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)data;
// taosArrayDestroy(pConsumerEp->vgs);
//}
// void tDeleteSMqConsumerEp(void *data) {
// SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)data;
// taosArrayDestroy(pConsumerEp->vgs);
// }
int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
int32_t tlen = 0;
......@@ -420,7 +421,7 @@ int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
tlen += taosEncodeArray(buf, pConsumerEp->vgs, (FEncode)tEncodeSMqVgEp);
int32_t szVgs = taosArrayGetSize(pConsumerEp->offsetRows);
tlen += taosEncodeFixedI32(buf, szVgs);
for (int32_t j= 0; j < szVgs; ++j) {
for (int32_t j = 0; j < szVgs; ++j) {
OffsetRows *offRows = taosArrayGet(pConsumerEp->offsetRows, j);
tlen += taosEncodeFixedI32(buf, offRows->vgId);
tlen += taosEncodeFixedI64(buf, offRows->rows);
......@@ -434,28 +435,28 @@ int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
// do nothing
}
}
//#if 0
// int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
// tlen += taosEncodeFixedI32(buf, sz);
// for (int32_t i = 0; i < sz; i++) {
// SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
// tlen += tEncodeSMqVgEp(buf, pVgEp);
// }
//#endif
// #if 0
// int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
// tlen += taosEncodeFixedI32(buf, sz);
// for (int32_t i = 0; i < sz; i++) {
// SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
// tlen += tEncodeSMqVgEp(buf, pVgEp);
// }
// #endif
return tlen;
}
void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp, int8_t sver) {
buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
buf = taosDecodeArray(buf, &pConsumerEp->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp), sver);
if (sver > 1){
if (sver > 1) {
int32_t szVgs = 0;
buf = taosDecodeFixedI32(buf, &szVgs);
if(szVgs > 0){
if (szVgs > 0) {
pConsumerEp->offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows));
if (NULL == pConsumerEp->offsetRows) return NULL;
for (int32_t j= 0; j < szVgs; ++j) {
OffsetRows* offRows = taosArrayReserve(pConsumerEp->offsetRows, 1);
for (int32_t j = 0; j < szVgs; ++j) {
OffsetRows *offRows = taosArrayReserve(pConsumerEp->offsetRows, 1);
buf = taosDecodeFixedI32(buf, &offRows->vgId);
buf = taosDecodeFixedI64(buf, &offRows->rows);
buf = taosDecodeFixedI8(buf, &offRows->offset.type);
......@@ -470,21 +471,21 @@ void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp, int8_t s
}
}
}
//#if 0
// int32_t sz;
// buf = taosDecodeFixedI32(buf, &sz);
// pConsumerEp->vgs = taosArrayInit(sz, sizeof(void *));
// for (int32_t i = 0; i < sz; i++) {
// SMqVgEp *pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
// buf = tDecodeSMqVgEp(buf, pVgEp);
// taosArrayPush(pConsumerEp->vgs, &pVgEp);
// }
//#endif
// #if 0
// int32_t sz;
// buf = taosDecodeFixedI32(buf, &sz);
// pConsumerEp->vgs = taosArrayInit(sz, sizeof(void *));
// for (int32_t i = 0; i < sz; i++) {
// SMqVgEp *pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
// buf = tDecodeSMqVgEp(buf, pVgEp);
// taosArrayPush(pConsumerEp->vgs, &pVgEp);
// }
// #endif
return (void *)buf;
}
SMqSubscribeObj *tNewSubscribeObj(const char* key) {
SMqSubscribeObj *tNewSubscribeObj(const char *key) {
SMqSubscribeObj *pSubObj = taosMemoryCalloc(1, sizeof(SMqSubscribeObj));
if (pSubObj == NULL) {
return NULL;
......@@ -577,7 +578,7 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
int32_t szVgs = taosArrayGetSize(pSub->offsetRows);
tlen += taosEncodeFixedI32(buf, szVgs);
for (int32_t j= 0; j < szVgs; ++j) {
for (int32_t j = 0; j < szVgs; ++j) {
OffsetRows *offRows = taosArrayGet(pSub->offsetRows, j);
tlen += taosEncodeFixedI32(buf, offRows->vgId);
tlen += taosEncodeFixedI64(buf, offRows->rows);
......@@ -617,14 +618,14 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub, int8_t sver) {
buf = taosDecodeArray(buf, &pSub->unassignedVgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp), sver);
buf = taosDecodeStringTo(buf, pSub->dbName);
if (sver > 1){
if (sver > 1) {
int32_t szVgs = 0;
buf = taosDecodeFixedI32(buf, &szVgs);
if(szVgs > 0){
if (szVgs > 0) {
pSub->offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows));
if (NULL == pSub->offsetRows) return NULL;
for (int32_t j= 0; j < szVgs; ++j) {
OffsetRows* offRows = taosArrayReserve(pSub->offsetRows, 1);
for (int32_t j = 0; j < szVgs; ++j) {
OffsetRows *offRows = taosArrayReserve(pSub->offsetRows, 1);
buf = taosDecodeFixedI32(buf, &offRows->vgId);
buf = taosDecodeFixedI64(buf, &offRows->rows);
buf = taosDecodeFixedI8(buf, &offRows->offset.type);
......@@ -639,71 +640,71 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub, int8_t sver) {
}
}
buf = taosDecodeString(buf, &pSub->qmsg);
}else{
} else {
pSub->qmsg = taosStrdup("");
}
return (void *)buf;
}
//SMqSubActionLogEntry *tCloneSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
// SMqSubActionLogEntry *pEntryNew = taosMemoryMalloc(sizeof(SMqSubActionLogEntry));
// if (pEntryNew == NULL) return NULL;
// pEntryNew->epoch = pEntry->epoch;
// pEntryNew->consumers = taosArrayDup(pEntry->consumers, (__array_item_dup_fn_t)tCloneSMqConsumerEp);
// return pEntryNew;
//}
// SMqSubActionLogEntry *tCloneSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
// SMqSubActionLogEntry *pEntryNew = taosMemoryMalloc(sizeof(SMqSubActionLogEntry));
// if (pEntryNew == NULL) return NULL;
// pEntryNew->epoch = pEntry->epoch;
// pEntryNew->consumers = taosArrayDup(pEntry->consumers, (__array_item_dup_fn_t)tCloneSMqConsumerEp);
// return pEntryNew;
// }
//
//void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
// taosArrayDestroyEx(pEntry->consumers, (FDelete)tDeleteSMqConsumerEp);
//}
//int32_t tEncodeSMqSubActionLogEntry(void **buf, const SMqSubActionLogEntry *pEntry) {
// int32_t tlen = 0;
// tlen += taosEncodeFixedI32(buf, pEntry->epoch);
// tlen += taosEncodeArray(buf, pEntry->consumers, (FEncode)tEncodeSMqSubActionLogEntry);
// return tlen;
//}
// void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
// taosArrayDestroyEx(pEntry->consumers, (FDelete)tDeleteSMqConsumerEp);
// }
// int32_t tEncodeSMqSubActionLogEntry(void **buf, const SMqSubActionLogEntry *pEntry) {
// int32_t tlen = 0;
// tlen += taosEncodeFixedI32(buf, pEntry->epoch);
// tlen += taosEncodeArray(buf, pEntry->consumers, (FEncode)tEncodeSMqSubActionLogEntry);
// return tlen;
// }
//
//void *tDecodeSMqSubActionLogEntry(const void *buf, SMqSubActionLogEntry *pEntry) {
// buf = taosDecodeFixedI32(buf, &pEntry->epoch);
// buf = taosDecodeArray(buf, &pEntry->consumers, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
// return (void *)buf;
//}
//SMqSubActionLogObj *tCloneSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
// SMqSubActionLogObj *pLogNew = taosMemoryMalloc(sizeof(SMqSubActionLogObj));
// if (pLogNew == NULL) return pLogNew;
// memcpy(pLogNew->key, pLog->key, TSDB_SUBSCRIBE_KEY_LEN);
// pLogNew->logs = taosArrayDup(pLog->logs, (__array_item_dup_fn_t)tCloneSMqConsumerEp);
// return pLogNew;
//}
// void *tDecodeSMqSubActionLogEntry(const void *buf, SMqSubActionLogEntry *pEntry) {
// buf = taosDecodeFixedI32(buf, &pEntry->epoch);
// buf = taosDecodeArray(buf, &pEntry->consumers, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
// return (void *)buf;
// }
// SMqSubActionLogObj *tCloneSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
// SMqSubActionLogObj *pLogNew = taosMemoryMalloc(sizeof(SMqSubActionLogObj));
// if (pLogNew == NULL) return pLogNew;
// memcpy(pLogNew->key, pLog->key, TSDB_SUBSCRIBE_KEY_LEN);
// pLogNew->logs = taosArrayDup(pLog->logs, (__array_item_dup_fn_t)tCloneSMqConsumerEp);
// return pLogNew;
// }
//
//void tDeleteSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
// taosArrayDestroyEx(pLog->logs, (FDelete)tDeleteSMqConsumerEp);
//}
//int32_t tEncodeSMqSubActionLogObj(void **buf, const SMqSubActionLogObj *pLog) {
// int32_t tlen = 0;
// tlen += taosEncodeString(buf, pLog->key);
// tlen += taosEncodeArray(buf, pLog->logs, (FEncode)tEncodeSMqSubActionLogEntry);
// return tlen;
//}
// void tDeleteSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
// taosArrayDestroyEx(pLog->logs, (FDelete)tDeleteSMqConsumerEp);
// }
// int32_t tEncodeSMqSubActionLogObj(void **buf, const SMqSubActionLogObj *pLog) {
// int32_t tlen = 0;
// tlen += taosEncodeString(buf, pLog->key);
// tlen += taosEncodeArray(buf, pLog->logs, (FEncode)tEncodeSMqSubActionLogEntry);
// return tlen;
// }
//
//void *tDecodeSMqSubActionLogObj(const void *buf, SMqSubActionLogObj *pLog) {
// buf = taosDecodeStringTo(buf, pLog->key);
// buf = taosDecodeArray(buf, &pLog->logs, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
// return (void *)buf;
//}
// void *tDecodeSMqSubActionLogObj(const void *buf, SMqSubActionLogObj *pLog) {
// buf = taosDecodeStringTo(buf, pLog->key);
// buf = taosDecodeArray(buf, &pLog->logs, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
// return (void *)buf;
// }
//
//int32_t tEncodeSMqOffsetObj(void **buf, const SMqOffsetObj *pOffset) {
// int32_t tlen = 0;
// tlen += taosEncodeString(buf, pOffset->key);
// tlen += taosEncodeFixedI64(buf, pOffset->offset);
// return tlen;
//}
// int32_t tEncodeSMqOffsetObj(void **buf, const SMqOffsetObj *pOffset) {
// int32_t tlen = 0;
// tlen += taosEncodeString(buf, pOffset->key);
// tlen += taosEncodeFixedI64(buf, pOffset->offset);
// return tlen;
// }
//
//void *tDecodeSMqOffsetObj(void *buf, SMqOffsetObj *pOffset) {
// buf = taosDecodeStringTo(buf, pOffset->key);
// buf = taosDecodeFixedI64(buf, &pOffset->offset);
// return buf;
//}
// void *tDecodeSMqOffsetObj(void *buf, SMqOffsetObj *pOffset) {
// buf = taosDecodeStringTo(buf, pOffset->key);
// buf = taosDecodeFixedI64(buf, &pOffset->offset);
// return buf;
// }
......@@ -28,7 +28,7 @@
#include "parser.h"
#include "tname.h"
#define MND_STREAM_VER_NUMBER 2
#define MND_STREAM_VER_NUMBER 3
#define MND_STREAM_RESERVE_SIZE 64
#define MND_STREAM_MAX_NUM 60
......@@ -140,10 +140,12 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
void *buf = NULL;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto STREAM_DECODE_OVER;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
goto STREAM_DECODE_OVER;
}
if (sver != 1 && sver != 2) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
if (sver != MND_STREAM_VER_NUMBER) {
terrno = 0;
goto STREAM_DECODE_OVER;
}
......@@ -198,12 +200,13 @@ static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream) {
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream) {
mTrace("stream:%s, perform update action", pOldStream->name);
atomic_exchange_64(&pOldStream->updateTime, pNewStream->updateTime);
atomic_exchange_32(&pOldStream->version, pNewStream->version);
taosWLockLatch(&pOldStream->lock);
pOldStream->status = pNewStream->status;
pOldStream->updateTime = pNewStream->updateTime;
taosWUnLockLatch(&pOldStream->lock);
return 0;
......@@ -429,9 +432,11 @@ FAIL:
return 0;
}
int32_t mndPersistTaskDeployReq(STrans *pTrans, const SStreamTask *pTask) {
int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) {
SEncoder encoder;
tEncoderInit(&encoder, NULL, 0);
pTask->ver = SSTREAM_TASK_VER;
tEncodeStreamTask(&encoder, pTask);
int32_t size = encoder.pos;
......@@ -520,7 +525,6 @@ int32_t mndPersistDropStreamLog(SMnode *pMnode, STrans *pTrans, SStreamObj *pStr
SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
......@@ -537,7 +541,6 @@ static int32_t mndSetStreamRecover(SMnode *pMnode, STrans *pTrans, const SStream
if (pCommitRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("stream trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
......@@ -1264,7 +1267,7 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
// task id
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
char idstr[128] = {0};
char idstr[128] = {0};
int32_t len = tintToHex(pTask->id.taskId, &idstr[4]);
idstr[2] = '0';
idstr[3] = 'x';
......@@ -1304,7 +1307,7 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
colDataSetVal(pColInfo, numOfRows, (const char *)&level, false);
// status
char status[20 + VARSTR_HEADER_SIZE] = {0};
char status[20 + VARSTR_HEADER_SIZE] = {0};
int8_t taskStatus = atomic_load_8(&pTask->status.taskStatus);
if (taskStatus == TASK_STATUS__NORMAL) {
memcpy(varDataVal(status), "normal", 6);
......@@ -1370,7 +1373,7 @@ static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) {
return 0;
}
int32_t mndPauseAllStreamTaskImpl(STrans *pTrans, SArray* tasks) {
int32_t mndPauseAllStreamTaskImpl(STrans *pTrans, SArray *tasks) {
int32_t size = taosArrayGetSize(tasks);
for (int32_t i = 0; i < size; i++) {
SArray *pTasks = taosArrayGetP(tasks, i);
......@@ -1409,7 +1412,6 @@ static int32_t mndPersistStreamLog(STrans *pTrans, const SStreamObj *pStream, in
if (pCommitRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("stream trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
......@@ -1431,7 +1433,6 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
if (pStream == NULL) {
if (pauseReq.igNotExists) {
mInfo("stream:%s, not exist, if exist is set", pauseReq.name);
sdbRelease(pMnode->pSdb, pStream);
return 0;
} else {
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
......@@ -1440,6 +1441,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
}
if (pStream->status == STREAM_STATUS__PAUSE) {
sdbRelease(pMnode->pSdb, pStream);
return 0;
}
......@@ -1491,7 +1493,6 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
return TSDB_CODE_ACTION_IN_PROGRESS;
}
static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask, int8_t igUntreated) {
SVResumeStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResumeStreamTaskReq));
if (pReq == NULL) {
......
......@@ -444,6 +444,13 @@ int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pSttFileReader
pIter->pReader = pSttFileReader;
pIter->pBlockLoadInfo = pBlockLoadInfo;
if (pIter->pReader == NULL) {
tsdbError("stt file reader is null, %s", idStr);
pIter->pSttBlk = NULL;
pIter->iSttBlk = -1;
return TSDB_CODE_SUCCESS;
}
if (!pBlockLoadInfo->sttBlockLoaded) {
int64_t st = taosGetTimestampUs();
......@@ -814,6 +821,12 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf) {
SLDataIter *pIter = taosMemoryCalloc(1, sizeof(SLDataIter));
taosArrayPush(pList, &pIter);
}
} else if (numOfIter > TARRAY2_SIZE(pSttLevel->fobjArr)){
int32_t inc = numOfIter - TARRAY2_SIZE(pSttLevel->fobjArr);
for (int i = 0; i < inc; ++i) {
SLDataIter *pIter = taosArrayPop(pList);
destroyLDataIter(pIter);
}
}
for (int32_t i = 0; i < TARRAY2_SIZE(pSttLevel->fobjArr); ++i) { // open all last file
......@@ -829,7 +842,8 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf) {
code = tsdbSttFileReaderOpen(pSttLevel->fobjArr->data[i]->fname, &conf, &pSttFileReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
tsdbError("open stt file reader error. file name %s, code %s, %s", pSttLevel->fobjArr->data[i]->fname,
tstrerror(code), pMTree->idStr);
}
}
......@@ -844,7 +858,7 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf) {
if (code != TSDB_CODE_SUCCESS) {
goto _end;
}
bool hasVal = tLDataIterNextRow(pIter, pMTree->idStr);
if (hasVal) {
tMergeTreeAddIter(pMTree, pIter);
......
......@@ -1121,6 +1121,8 @@ static bool getNeighborBlockOfSameTable(SFileDataBlockInfo* pBlockInfo, STableBl
SBrinRecord* p = taosArrayGet(pTableBlockScanInfo->pBlockList, pBlockInfo->tbBlockIdx + step);
memcpy(pRecord, p, sizeof(SBrinRecord));
*nextIndex = pBlockInfo->tbBlockIdx + step;
// tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, pIndex->ordinalIndex, pBlock, tGetDataBlk);
return true;
}
......
......@@ -1009,7 +1009,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
SStateWinodwPhysiNode *pStateNode = (SStateWinodwPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_STATE_WINDOW_FORMAT,
nodesGetNameFromColumnNode(((STargetNode *)pStateNode->pStateKey)->pExpr));
nodesGetNameFromColumnNode(pStateNode->pStateKey));
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
if (pResNode->pExecInfo) {
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
......
......@@ -1858,7 +1858,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi
}
int32_t tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;
SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr;
SColumnNode* pColNode = (SColumnNode*)(pStateNode->pStateKey);
if (pStateNode->window.pExprs != NULL) {
int32_t numOfScalarExpr = 0;
......@@ -3574,6 +3574,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
SOptrBasicInfo* pBInfo = &pInfo->binfo;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
qDebug("===stream=== stream session agg");
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
} else if (pOperator->status == OP_RES_TO_RETURN) {
......@@ -3736,6 +3737,7 @@ void streamSessionReloadState(SOperatorInfo* pOperator) {
setSessionOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].win.ekey, pSeKeyBuf[i].groupId, &winInfo);
int32_t winNum = compactSessionWindow(pOperator, &winInfo, pInfo->pStUpdated, pInfo->pStDeleted, true);
if (winNum > 0) {
qDebug("===stream=== reload state. save result %" PRId64 ", %" PRIu64, winInfo.sessionWin.win.skey, winInfo.sessionWin.groupId);
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
saveResult(winInfo, pInfo->pStUpdated);
} else if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
......@@ -3754,7 +3756,7 @@ void streamSessionReloadState(SOperatorInfo* pOperator) {
SOperatorInfo* downstream = pOperator->pDownstream[0];
if (downstream->fpSet.reloadStreamStateFn) {
downstream->fpSet.reloadStreamStateFn(downstream);
}
}
}
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
......@@ -3863,6 +3865,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
SExprSupp* pSup = &pOperator->exprSupp;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
qDebug("===stream=== stream session semi agg");
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
......@@ -4373,6 +4376,7 @@ static void compactStateWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCur
initSessionOutputBuf(pCurWin, &pCurResult, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
SResultRow* pWinResult = NULL;
initSessionOutputBuf(pNextWin, &pWinResult, pAggSup->pDummyCtx, numOfOutput, pSup->rowEntryInfoOffset);
pCurWin->sessionWin.win.ekey = TMAX(pCurWin->sessionWin.win.ekey, pNextWin->sessionWin.win.ekey);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pCurWin->sessionWin.win, 1);
compactFunctions(pSup->pCtx, pAggSup->pDummyCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
......@@ -4449,7 +4453,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
SExecTaskInfo* pTaskInfo, SReadHandle* pHandle) {
SStreamStateWinodwPhysiNode* pStateNode = (SStreamStateWinodwPhysiNode*)pPhyNode;
int32_t tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;
SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr;
SColumnNode* pColNode = (SColumnNode*)(pStateNode->pStateKey);
int32_t code = TSDB_CODE_SUCCESS;
SStreamStateAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamStateAggOperatorInfo));
......
......@@ -468,7 +468,8 @@ static int32_t translateStddevMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t
static int32_t translateWduration(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
// pseudo column do not need to check parameters
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT};
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT,
.precision = pFunc->node.resType.precision};
return TSDB_CODE_SUCCESS;
}
......@@ -491,7 +492,8 @@ static int32_t translateTimePseudoColumn(SFunctionNode* pFunc, char* pErrBuf, in
// pseudo column do not need to check parameters
pFunc->node.resType =
(SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes, .type = TSDB_DATA_TYPE_TIMESTAMP};
(SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes, .type = TSDB_DATA_TYPE_TIMESTAMP,
.precision = pFunc->node.resType.precision};
return TSDB_CODE_SUCCESS;
}
......
......@@ -1303,9 +1303,9 @@ static int32_t createStateWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pC
if (TSDB_CODE_SUCCESS == code) {
code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pStateKey, &pState->pStateKey);
if (TSDB_CODE_SUCCESS == code) {
code = addDataBlockSlot(pCxt, &pState->pStateKey, pState->window.node.pOutputDataBlockDesc);
}
// if (TSDB_CODE_SUCCESS == code) {
// code = addDataBlockSlot(pCxt, &pState->pStateKey, pState->window.node.pOutputDataBlockDesc);
// }
}
if (TSDB_CODE_SUCCESS == code) {
......
......@@ -218,11 +218,11 @@ _EXIT:
}
void streamBackendCleanup(void* arg) {
SBackendWrapper* pHandle = (SBackendWrapper*)arg;
RocksdbCfInst** pIter = (RocksdbCfInst**)taosHashIterate(pHandle->cfInst, NULL);
void* pIter = taosHashIterate(pHandle->cfInst, NULL);
while (pIter != NULL) {
RocksdbCfInst* inst = *pIter;
RocksdbCfInst* inst = *(RocksdbCfInst**)pIter;
destroyRocksdbCfInst(inst);
taosHashIterate(pHandle->cfInst, pIter);
pIter = taosHashIterate(pHandle->cfInst, pIter);
}
taosHashCleanup(pHandle->cfInst);
......@@ -833,7 +833,10 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
qDebug("succ to open rocksdb cf");
}
// close default cf
if (((rocksdb_column_family_handle_t**)cfHandle)[0] != 0) rocksdb_column_family_handle_destroy(cfHandle[0]);
if (((rocksdb_column_family_handle_t**)cfHandle)[0] != 0) {
rocksdb_column_family_handle_destroy(cfHandle[0]);
cfHandle[0] = NULL;
}
rocksdb_options_destroy(cfOpts[0]);
handle->db = db;
......
......@@ -364,6 +364,16 @@ static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
// 7. pause allowed.
streamTaskEnablePause(pStreamTask);
if (taosQueueEmpty(pStreamTask->inputQueue->queue)) {
SStreamRefDataBlock* pItem = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);;
SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
pDelBlock->info.rows = 0;
pDelBlock->info.version = 0;
pItem->type = STREAM_INPUT__REF_DATA_BLOCK;
pItem->pBlock = pDelBlock;
int32_t code = tAppendDataToInputQueue(pStreamTask, (SStreamQueueItem*)pItem);
qDebug("s-task:%s append dummy delete block,res:%d", pStreamTask->id.idStr, code);
}
streamSchedExec(pStreamTask);
streamMetaReleaseTask(pMeta, pStreamTask);
......
......@@ -202,6 +202,7 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
void* buf = NULL;
int32_t len;
int32_t code;
pTask->ver = SSTREAM_TASK_VER;
tEncodeSize(tEncodeStreamTask, pTask, len, code);
if (code < 0) {
return -1;
......@@ -331,7 +332,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId) {
qDebug("s-task:0x%x set task status:%s", taskId, streamGetTaskStatusStr(TASK_STATUS__DROPPING));
while(1) {
while (1) {
taosRLockLatch(&pMeta->lock);
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
......@@ -443,9 +444,20 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
taosArrayDestroy(pRecycleList);
return -1;
}
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
tDecodeStreamTask(&decoder, pTask);
if (tDecodeStreamTask(&decoder, pTask) < 0) {
tDecoderClear(&decoder);
tdbFree(pKey);
tdbFree(pVal);
tdbTbcClose(pCur);
taosArrayDestroy(pRecycleList);
tFreeStreamTask(pTask);
qError(
"stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream "
"manually",
tsDataDir);
return -1;
}
tDecoderClear(&decoder);
if (pTask->status.taskStatus == TASK_STATUS__DROPPING) {
......@@ -500,13 +512,13 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
}
if (taosArrayGetSize(pRecycleList) > 0) {
for(int32_t i = 0; i < taosArrayGetSize(pRecycleList); ++i) {
int32_t taskId = *(int32_t*) taosArrayGet(pRecycleList, i);
for (int32_t i = 0; i < taosArrayGetSize(pRecycleList); ++i) {
int32_t taskId = *(int32_t*)taosArrayGet(pRecycleList, i);
streamMetaRemoveTask(pMeta, taskId);
}
}
qDebug("vgId:%d load %d task from disk", pMeta->vgId, (int32_t) taosArrayGetSize(pMeta->pTaskList));
qDebug("vgId:%d load %d task from disk", pMeta->vgId, (int32_t)taosArrayGetSize(pMeta->pTaskList));
taosArrayDestroy(pRecycleList);
return 0;
}
......@@ -26,13 +26,14 @@ static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) {
return 0;
}
SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHistory, int64_t triggerParam, SArray* pTaskList) {
SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHistory, int64_t triggerParam,
SArray* pTaskList) {
SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pTask->ver = SSTREAM_TASK_VER;
pTask->id.taskId = tGenIdPI32();
pTask->id.streamId = streamId;
pTask->info.taskLevel = taskLevel;
......@@ -72,6 +73,7 @@ int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo) {
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pTask->ver) < 0) return -1;
if (tEncodeI64(pEncoder, pTask->id.streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->id.taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->info.totalLevel) < 0) return -1;
......@@ -135,6 +137,9 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pTask->ver) < 0) return -1;
if (pTask->ver != SSTREAM_TASK_VER) return -1;
if (tDecodeI64(pDecoder, &pTask->id.streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->id.taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->info.totalLevel) < 0) return -1;
......@@ -163,7 +168,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if (tDecodeI64(pDecoder, &pTask->dataRange.window.skey)) return -1;
if (tDecodeI64(pDecoder, &pTask->dataRange.window.ekey)) return -1;
int32_t epSz;
int32_t epSz = -1;
if (tDecodeI32(pDecoder, &epSz) < 0) return -1;
pTask->pUpstreamEpInfoList = taosArrayInit(epSz, POINTER_BYTES);
......
......@@ -135,6 +135,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-21561.py
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TS-3404.py
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TS-3581.py
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TS-3311.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/balance_vgroups_r1.py -N 6
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/taosShell.py
......@@ -160,7 +161,7 @@
,,n,system-test,python3 ./test.py -f 0-others/tag_index_basic.py
,,n,system-test,python3 ./test.py -f 0-others/udfpy_main.py
,,n,system-test,python3 ./test.py -N 3 -f 0-others/walRetention.py
,,n,system-test,python3 ./test.py -f 0-others/splitVGroup.py -N 5
#,,n,system-test,python3 ./test.py -f 0-others/splitVGroup.py -N 5
,,n,system-test,python3 ./test.py -f 0-others/timeRangeWise.py -N 3
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_database.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_replica.py -N 3
......
import taos
import sys
import time
import socket
import os
import threading
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
class TDTestCase:
hostname = socket.gethostname()
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
#tdSql.init(conn.cursor())
tdSql.init(conn.cursor(), logSql) # output sql.txt file
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]
break
return buildPath
def create_tables(self):
tdSql.execute("create database if not exists dbus precision 'us'")
tdSql.execute("create database if not exists dbns precision 'ns'")
tdSql.execute("use dbus")
tdSql.execute(f"CREATE STABLE `stb_us` (`ts` TIMESTAMP, `ip_value` FLOAT, `ip_quality` INT) TAGS (`t1` INT)")
tdSql.execute(f"CREATE TABLE `ctb1_us` USING `stb_us` (`t1`) TAGS (1)")
tdSql.execute(f"CREATE TABLE `ctb2_us` USING `stb_us` (`t1`) TAGS (2)")
tdSql.execute("use dbns")
tdSql.execute(f"CREATE STABLE `stb_ns` (`ts` TIMESTAMP, `ip_value` FLOAT, `ip_quality` INT) TAGS (`t1` INT)")
tdSql.execute(f"CREATE TABLE `ctb1_ns` USING `stb_ns` (`t1`) TAGS (1)")
tdSql.execute(f"CREATE TABLE `ctb2_ns` USING `stb_ns` (`t1`) TAGS (2)")
def insert_data(self):
tdLog.debug("start to insert data ............")
tdSql.execute(f"INSERT INTO `dbus`.`ctb1_us` VALUES ('2023-07-01 00:00:00.000', 10.30000, 100)")
tdSql.execute(f"INSERT INTO `dbus`.`ctb2_us` VALUES ('2023-08-01 00:00:00.000', 20.30000, 200)")
tdSql.execute(f"INSERT INTO `dbns`.`ctb1_ns` VALUES ('2023-07-01 00:00:00.000', 10.30000, 100)")
tdSql.execute(f"INSERT INTO `dbns`.`ctb2_ns` VALUES ('2023-08-01 00:00:00.000', 20.30000, 200)")
tdLog.debug("insert data ............ [OK]")
def run(self):
tdSql.prepare()
self.create_tables()
self.insert_data()
tdLog.printNoPrefix("======== test TS-3311")
# test ns
tdSql.query(f"select _wstart, _wend, count(*) from `dbns`.`stb_ns` interval(1n)")
tdSql.checkRows(2)
tdSql.checkData(0, 0, '2023-07-01 00:00:00.000000000')
tdSql.checkData(1, 0, '2023-08-01 00:00:00.000000000')
tdSql.checkData(0, 1, '2023-08-01 00:00:00.000000000')
tdSql.checkData(1, 1, '2023-09-01 00:00:00.000000000')
tdSql.query(f"select _wstart, _wend, count(*) from `dbns`.`stb_ns` interval(12n)")
tdSql.checkRows(1)
tdSql.checkData(0, 0, '2023-01-01 00:00:00.000000000')
tdSql.checkData(0, 1, '2024-01-01 00:00:00.000000000')
tdSql.query(f"select _wstart, _wend, count(*) from `dbns`.`stb_ns` interval(1y)")
tdSql.checkRows(1)
tdSql.checkData(0, 0, '2023-01-01 00:00:00.000000000')
tdSql.checkData(0, 1, '2024-01-01 00:00:00.000000000')
## test us
tdSql.query(f"select _wstart, _wend, count(*) from `dbus`.`stb_us` interval(1n)")
tdSql.checkRows(2)
tdSql.checkData(0, 0, '2023-07-01 00:00:00.000000')
tdSql.checkData(1, 0, '2023-08-01 00:00:00.000000')
tdSql.checkData(0, 1, '2023-08-01 00:00:00.000000')
tdSql.checkData(1, 1, '2023-09-01 00:00:00.000000')
tdSql.query(f"select _wstart, _wend, count(*) from `dbus`.`stb_us` interval(12n)")
tdSql.checkRows(1)
tdSql.checkData(0, 0, '2023-01-01 00:00:00.000000')
tdSql.checkData(0, 1, '2024-01-01 00:00:00.000000')
tdSql.query(f"select _wstart, _wend, count(*) from `dbus`.`stb_us` interval(1y)")
tdSql.checkRows(1)
tdSql.checkData(0, 0, '2023-01-01 00:00:00.000000')
tdSql.checkData(0, 1, '2024-01-01 00:00:00.000000')
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册