提交 c10b8b29 编写于 作者: sangshuduo's avatar sangshuduo

Merge branch '3.0' into feat/sangshuduo/TD-14141-update-taostools-for3.0

......@@ -163,18 +163,6 @@ option(
ON
)
option(
BUILD_WITH_CRAFT
"If build with canonical-raft"
OFF
)
option(
BUILD_WITH_TRAFT
"If build with traft"
OFF
)
IF(${TD_LINUX} MATCHES TRUE)
option(
......
# canonical-raft
ExternalProject_Add(craft
GIT_REPOSITORY https://github.com/canonical/raft.git
GIT_TAG v0.11.2
SOURCE_DIR "${TD_CONTRIB_DIR}/craft"
BINARY_DIR "${TD_CONTRIB_DIR}/craft"
#BUILD_IN_SOURCE TRUE
# https://answers.ros.org/question/333125/how-to-include-external-automakeautoconf-projects-into-ament_cmake/
CONFIGURE_COMMAND COMMAND autoreconf -i COMMAND ./configure --enable-example
BUILD_COMMAND "$(MAKE)"
INSTALL_COMMAND ""
TEST_COMMAND ""
)
# traft
ExternalProject_Add(traft
GIT_REPOSITORY https://github.com/taosdata/traft.git
GIT_TAG for_3.0
SOURCE_DIR "${TD_CONTRIB_DIR}/traft"
BINARY_DIR "${TD_CONTRIB_DIR}/traft"
#BUILD_IN_SOURCE TRUE
# https://answers.ros.org/question/333125/how-to-include-external-automakeautoconf-projects-into-ament_cmake/
CONFIGURE_COMMAND COMMAND autoreconf -i COMMAND ./configure
BUILD_COMMAND "$(MAKE)"
INSTALL_COMMAND ""
TEST_COMMAND ""
)
......@@ -3,7 +3,7 @@ package main
import (
"fmt"
"github.com/taosdata/driver-go/v2/af"
"github.com/taosdata/driver-go/v3/af"
)
func main() {
......
......@@ -4,7 +4,7 @@ import (
"database/sql"
"fmt"
_ "github.com/taosdata/driver-go/v2/taosSql"
_ "github.com/taosdata/driver-go/v3/taosSql"
)
func main() {
......
......@@ -4,7 +4,7 @@ import (
"database/sql"
"fmt"
_ "github.com/taosdata/driver-go/v2/taosRestful"
_ "github.com/taosdata/driver-go/v3/taosRestful"
)
func main() {
......
......@@ -3,7 +3,7 @@ package main
import (
"fmt"
"github.com/taosdata/driver-go/v2/wrapper"
"github.com/taosdata/driver-go/v3/wrapper"
)
func main() {
......
......@@ -2,5 +2,5 @@ module goexample
go 1.17
require github.com/taosdata/driver-go/v2 develop
require github.com/taosdata/driver-go/v3 3.0
......@@ -3,7 +3,7 @@ package main
import (
"fmt"
"github.com/taosdata/driver-go/v2/af"
"github.com/taosdata/driver-go/v3/af"
)
func prepareDatabase(conn *af.Connector) {
......
......@@ -3,7 +3,7 @@ package main
import (
"fmt"
"github.com/taosdata/driver-go/v2/af"
"github.com/taosdata/driver-go/v3/af"
)
func prepareDatabase(conn *af.Connector) {
......
......@@ -4,7 +4,7 @@ import (
"database/sql"
"fmt"
_ "github.com/taosdata/driver-go/v2/taosRestful"
_ "github.com/taosdata/driver-go/v3/taosRestful"
)
func createStable(taos *sql.DB) {
......
......@@ -4,9 +4,9 @@ import (
"fmt"
"time"
"github.com/taosdata/driver-go/v2/af"
"github.com/taosdata/driver-go/v2/af/param"
"github.com/taosdata/driver-go/v2/common"
"github.com/taosdata/driver-go/v3/af"
"github.com/taosdata/driver-go/v3/af/param"
"github.com/taosdata/driver-go/v3/common"
)
func checkErr(err error, prompt string) {
......
......@@ -3,7 +3,7 @@ package main
import (
"fmt"
"github.com/taosdata/driver-go/v2/af"
"github.com/taosdata/driver-go/v3/af"
)
func prepareDatabase(conn *af.Connector) {
......
......@@ -5,7 +5,7 @@ import (
"fmt"
"time"
_ "github.com/taosdata/driver-go/v2/taosRestful"
_ "github.com/taosdata/driver-go/v3/taosRestful"
)
func main() {
......
package main
import (
"database/sql/driver"
"context"
"encoding/json"
"fmt"
"io"
"os"
"strconv"
"time"
taos "github.com/taosdata/driver-go/v2/af"
"github.com/taosdata/driver-go/v3/af"
"github.com/taosdata/driver-go/v3/af/tmq"
"github.com/taosdata/driver-go/v3/common"
"github.com/taosdata/driver-go/v3/errors"
"github.com/taosdata/driver-go/v3/wrapper"
)
func main() {
db, err := taos.Open("", "", "", "log", 0)
db, err := af.Open("", "root", "taosdata", "", 0)
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
panic(err)
}
defer db.Close()
topic, err := db.Subscribe(false, "taoslogtail", "select ts, level, ipaddr, content from log", time.Second)
_, err = db.Exec("create database if not exists example_tmq")
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(2)
panic(err)
}
_, err = db.Exec("create topic if not exists example_tmq_topic with meta as DATABASE example_tmq")
if err != nil {
panic(err)
}
config := tmq.NewConfig()
defer config.Destroy()
err = config.SetGroupID("test")
if err != nil {
panic(err)
}
err = config.SetAutoOffsetReset("earliest")
if err != nil {
panic(err)
}
err = config.SetConnectIP("127.0.0.1")
if err != nil {
panic(err)
}
err = config.SetConnectUser("root")
if err != nil {
panic(err)
}
err = config.SetConnectPass("taosdata")
if err != nil {
panic(err)
}
err = config.SetConnectPort("6030")
if err != nil {
panic(err)
}
err = config.SetMsgWithTableName(true)
if err != nil {
panic(err)
}
err = config.EnableHeartBeat()
if err != nil {
panic(err)
}
err = config.EnableAutoCommit(func(result *wrapper.TMQCommitCallbackResult) {
if result.ErrCode != 0 {
errStr := wrapper.TMQErr2Str(result.ErrCode)
err := errors.NewError(int(result.ErrCode), errStr)
panic(err)
}
})
if err != nil {
panic(err)
}
consumer, err := tmq.NewConsumer(config)
if err != nil {
panic(err)
}
err = consumer.Subscribe([]string{"example_tmq_topic"})
if err != nil {
panic(err)
}
_, err = db.Exec("create table example_tmq.t1 (ts timestamp,v int)")
if err != nil {
panic(err)
}
defer topic.Unsubscribe(true)
for {
func() {
rows, err := topic.Consume()
defer func() { rows.Close(); time.Sleep(time.Second) }()
if err != nil {
fmt.Println(err)
os.Exit(3)
}
for {
values := make([]driver.Value, 4)
err := rows.Next(values)
if err == io.EOF {
break
} else if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(4)
}
ts := values[0].(time.Time)
level := values[1].(int8)
ipaddr := values[2].(string)
content := values[3].(string)
fmt.Printf("%s %d %s %s\n", ts.Format(time.StampMilli), level, ipaddr, content)
}
}()
result, err := consumer.Poll(time.Second)
if err != nil {
panic(err)
}
if result.Type != common.TMQ_RES_TABLE_META {
panic("want message type 2 got " + strconv.Itoa(int(result.Type)))
}
data, _ := json.Marshal(result.Meta)
fmt.Println(string(data))
consumer.Commit(context.Background(), result.Message)
consumer.FreeMessage(result.Message)
break
}
_, err = db.Exec("insert into example_tmq.t1 values(now,1)")
if err != nil {
panic(err)
}
for {
result, err := consumer.Poll(time.Second)
if err != nil {
panic(err)
}
if result.Type != common.TMQ_RES_DATA {
panic("want message type 1 got " + strconv.Itoa(int(result.Type)))
}
data, _ := json.Marshal(result.Data)
fmt.Println(string(data))
consumer.Commit(context.Background(), result.Message)
consumer.FreeMessage(result.Message)
break
}
consumer.Close()
}
// 未完成
......@@ -4,11 +4,11 @@ sidebar_label: 文档首页
slug: /
---
TDengine 是一款[高性能](https://www.taosdata.com/fast)[分布式](https://www.taosdata.com/scalable)[支持 SQL](https://www.taosdata.com/sql-support) 的时序数据库 (Database)。本文档是 TDengine 用户手册,主要是介绍 TDengine 的基本概念、安装、使用、功能、开发接口、运营维护、TDengine 内核设计等等,它主要是面向架构师、开发者与系统管理员的。
TDengine是一款开源、[高性能](https://www.taosdata.com/fast)、云原生的专为物联网、工业互联网、金融等优化设计的时序数据库(Time-Series Database)。同时它还带有内建的缓存、流式计算、数据订阅等系统功能,能大幅减少系统设计的复杂度,降低研发和运营成本,是一极简的时序数据处理平台。本文档是 TDengine 用户手册,主要是介绍 TDengine 的基本概念、安装、使用、功能、开发接口、运营维护、TDengine 内核设计等等,它主要是面向架构师、开发者与系统管理员的。
TDengine 充分利用了时序数据的特点,提出了“一个数据采集点一张表”与“超级表”的概念,设计了创新的存储引擎,让数据的写入、查询和存储效率都得到极大的提升。为正确理解并使用TDengine, 无论如何,请您仔细阅读[基本概念](./concept)一章。
如果你是开发者,请一定仔细阅读[开发指南](./develop)一章,该部分对数据库连接、建模、插入数据、查询、连续查询、缓存、数据订阅、用户自定义函数等功能都做了详细介绍,并配有各种编程语言的示例代码。大部分情况下,你只要把示例代码拷贝粘贴,针对自己的应用稍作改动,就能跑起来。
如果你是开发者,请一定仔细阅读[开发指南](./develop)一章,该部分对数据库连接、建模、插入数据、查询、流式计算、缓存、数据订阅、用户自定义函数等功能都做了详细介绍,并配有各种编程语言的示例代码。大部分情况下,你只要把示例代码拷贝粘贴,针对自己的应用稍作改动,就能跑起来。
我们已经生活在大数据的时代,纵向扩展已经无法满足日益增长的业务需求,任何系统都必须具有水平扩展的能力,集群成为大数据以及 database 系统的不可缺失功能。TDengine 团队不仅实现了集群功能,而且将这一重要核心功能开源。怎么部署、管理和维护 TDengine 集群,请参考[集群管理](./cluster)一章。
......
......@@ -5,7 +5,7 @@ title: REST API
为支持各种不同类型平台的开发,TDengine 提供符合 REST 设计标准的 API,即 REST API。为最大程度降低学习成本,不同于其他数据库 REST API 的设计方法,TDengine 直接通过 HTTP POST 请求 BODY 中包含的 SQL 语句来操作数据库,仅需要一个 URL。REST 连接器的使用参见[视频教程](https://www.taosdata.com/blog/2020/11/11/1965.html)。
:::note
与原生连接器的一个区别是,RESTful 接口是无状态的,因此 `USE db_name` 指令没有效果,所有对表名、超级表名的引用都需要指定数据库名前缀。从 2.2.0.0 版本开始,支持在 RESTful URL 中指定 db_name,这时如果 SQL 语句中没有指定数据库名前缀的话,会使用 URL 中指定的这个 db_name。从 2.4.0.0 版本开始,RESTful 默认由 taosAdapter 提供,要求必须在 URL 中指定 db_name。
与原生连接器的一个区别是,RESTful 接口是无状态的,因此 `USE db_name` 指令没有效果,所有对表名、超级表名的引用都需要指定数据库名前缀。支持在 RESTful URL 中指定 db_name,这时如果 SQL 语句中没有指定数据库名前缀的话,会使用 URL 中指定的这个 db_name。
:::
## 安装
......@@ -28,54 +28,204 @@ curl -L -H "Authorization: Basic cm9vdDp0YW9zZGF0YQ==" -d "show databases;" h1.t
```json
{
"status": "succ",
"head": [
"name",
"created_time",
"ntables",
"vgroups",
"replica",
"quorum",
"days",
"keep1,keep2,keep(D)",
"cache(MB)",
"blocks",
"minrows",
"maxrows",
"wallevel",
"fsync",
"comp",
"precision",
"status"
],
"data": [
[
"log",
"2020-09-02 17:23:00.039",
4,
1,
1,
1,
10,
"30,30,30",
1,
3,
100,
4096,
1,
3000,
2,
"us",
"ready"
]
],
"rows": 1
"code": 0,
"column_meta": [
[
"name",
"VARCHAR",
64
],
[
"create_time",
"TIMESTAMP",
8
],
[
"vgroups",
"SMALLINT",
2
],
[
"ntables",
"BIGINT",
8
],
[
"replica",
"TINYINT",
1
],
[
"strict",
"VARCHAR",
4
],
[
"duration",
"VARCHAR",
10
],
[
"keep",
"VARCHAR",
32
],
[
"buffer",
"INT",
4
],
[
"pagesize",
"INT",
4
],
[
"pages",
"INT",
4
],
[
"minrows",
"INT",
4
],
[
"maxrows",
"INT",
4
],
[
"comp",
"TINYINT",
1
],
[
"precision",
"VARCHAR",
2
],
[
"status",
"VARCHAR",
10
],
[
"retention",
"VARCHAR",
60
],
[
"single_stable",
"BOOL",
1
],
[
"cachemodel",
"VARCHAR",
11
],
[
"cachesize",
"INT",
4
],
[
"wal_level",
"TINYINT",
1
],
[
"wal_fsync_period",
"INT",
4
],
[
"wal_retention_period",
"INT",
4
],
[
"wal_retention_size",
"BIGINT",
8
],
[
"wal_roll_period",
"INT",
4
],
[
"wal_seg_size",
"BIGINT",
8
]
],
"data": [
[
"information_schema",
null,
null,
14,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
"ready",
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
],
[
"performance_schema",
null,
null,
3,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
"ready",
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
]
],
"rows": 2
}
```
## HTTP 请求格式
```
```text
http://<fqdn>:<port>/rest/sql/[db_name]
```
......@@ -83,21 +233,21 @@ http://<fqdn>:<port>/rest/sql/[db_name]
- fqnd: 集群中的任一台主机 FQDN 或 IP 地址
- port: 配置文件中 httpPort 配置项,缺省为 6041
- db_name: 可选参数,指定本次所执行的 SQL 语句的默认数据库库名。(从 2.2.0.0 版本开始支持)
- db_name: 可选参数,指定本次所执行的 SQL 语句的默认数据库库名。
例如:`http://h1.taos.com:6041/rest/sql/test` 是指向地址为 `h1.taos.com:6041` 的 URL,并将默认使用的数据库库名设置为 `test`。
HTTP 请求的 Header 里需带有身份认证信息,TDengine 支持 Basic 认证与自定义认证两种机制,后续版本将提供标准安全的数字签名机制来做身份验证。
- 自定义身份认证信息如下所示(token 稍后介绍)
- [自定义身份认证信息](#自定义授权码)如下所示
```
```text
Authorization: Taosd <TOKEN>
```
- Basic 身份认证信息如下所示
```
```text
Authorization: Basic <TOKEN>
```
......@@ -119,41 +269,165 @@ curl -L -u username:password -d "<SQL>" <ip>:<PORT>/rest/sql/[db_name]
## HTTP 返回格式
返回值为 JSON 格式,如下:
### HTTP 响应码
| **response code** | **说明** |
|-------------------|----------------|
| 200 | 正确返回和 C 接口错误返回 |
| 400 | 参数错误返回 |
| 401 | 鉴权失败 |
| 404 | 接口不存在 |
| 500 | 内部错误 |
| 503 | 系统资源不足 |
### HTTP body 结构
<table>
<tr>
<th>执行结果</th>
<th>说明</th>
<th>样例</th>
</tr>
<tr>
<td>正确执行</td>
<td>
code:(int)0 代表成功
<br/>
<br/>
column_meta:([][3]any)列信息,每个列会用三个值来说明,分别为:列名(string)、列类型(string)、类型长度(int)
<br/>
<br/>
rows:(int)数据返回行数
<br/>
<br/>
data:([][]any)具体数据内容
</td>
<td>
```json
{
"status": "succ",
"head": ["ts","current", …],
"column_meta": [["ts",9,8],["current",6,4], …],
"data": [
["2018-10-03 14:38:05.000", 10.3, …],
["2018-10-03 14:38:15.000", 12.6, …]
],
"rows": 2
"code": 0,
"column_meta": [["affected_rows", "INT", 4]],
"data": [[0]],
"rows": 1
}
```
说明:
</td>
</tr>
<tr>
<td>正确查询</td>
<td>
code:(int)0 代表成功
<br/>
<br/>
column_meta:([][3]any) 列信息,每个列会用三个值来说明,分别为:列名(string)、列类型(string)、类型长度(int)
<br/>
<br/>
rows:(int)数据返回行数
<br/>
<br/>
data:([][]any)具体数据内容
</td>
<td>
- status: 告知操作结果是成功还是失败。
- head: 表的定义,如果不返回结果集,则仅有一列 “affected_rows”。(从 2.0.17.0 版本开始,建议不要依赖 head 返回值来判断数据列类型,而推荐使用 column_meta。在后续版本中,有可能会从返回值中去掉 head 这一项。)
- column_meta: 从 2.0.17.0 版本开始,返回值中增加这一项来说明 data 里每一列的数据类型。具体每个列会用三个值来说明,分别为:列名、列类型、类型长度。例如`["current",6,4]`表示列名为“current”;列类型为 6,也即 float 类型;类型长度为 4,也即对应 4 个字节表示的 float。如果列类型为 binary 或 nchar,则类型长度表示该列最多可以保存的内容长度,而不是本次返回值中的具体数据长度。当列类型是 nchar 的时候,其类型长度表示可以保存的 unicode 字符数量,而不是 bytes。
- data: 具体返回的数据,一行一行的呈现,如果不返回结果集,那么就仅有 [[affected_rows]]。data 中每一行的数据列顺序,与 column_meta 中描述数据列的顺序完全一致。
- rows: 表明总共多少行数据。
```json
{
"code": 0,
"column_meta": [
["ts", "TIMESTAMP", 8],
["count", "BIGINT", 8],
["endpoint", "VARCHAR", 45],
["status_code", "INT", 4],
["client_ip", "VARCHAR", 40],
["request_method", "VARCHAR", 15],
["request_uri", "VARCHAR", 128]
],
"data": [
[
"2022-06-29T05:50:55.401Z",
2,
"LAPTOP-NNKFTLTG:6041",
200,
"172.23.208.1",
"POST",
"/rest/sql"
],
[
"2022-06-29T05:52:16.603Z",
1,
"LAPTOP-NNKFTLTG:6041",
200,
"172.23.208.1",
"POST",
"/rest/sql"
],
[
"2022-06-29T06:28:14.118Z",
1,
"LAPTOP-NNKFTLTG:6041",
200,
"172.23.208.1",
"POST",
"/rest/sql"
],
[
"2022-06-29T05:52:16.603Z",
2,
"LAPTOP-NNKFTLTG:6041",
401,
"172.23.208.1",
"POST",
"/rest/sql"
]
],
"rows": 4
}
```
column_meta 中的列类型说明:
</td>
</tr>
<tr>
<td>错误</td>
<td>
code:(int)错误码
<br/>
<br/>
desc:(string)错误描述
</td>
<td>
- 1:BOOL
- 2:TINYINT
- 3:SMALLINT
- 4:INT
- 5:BIGINT
- 6:FLOAT
- 7:DOUBLE
- 8:BINARY
- 9:TIMESTAMP
- 10:NCHAR
```json
{
"code": 9728,
"desc": "syntax error near \"1\""
}
```
</td>
</tr>
</table>
### 说明
- 时间格式仅支持 RFC3339,结果集为 0 时区
- 列类型使用如下字符串:
> "NULL"
> "BOOL"
> "TINYINT"
> "SMALLINT"
> "INT"
> "BIGINT"
> "FLOAT"
> "DOUBLE"
> "VARCHAR"
> "TIMESTAMP"
> "NCHAR"
> "TINYINT UNSIGNED"
> "SMALLINT UNSIGNED"
> "INT UNSIGNED"
> "BIGINT UNSIGNED"
> "JSON"
## 自定义授权码
......@@ -199,19 +473,44 @@ curl http://192.168.0.1:6041/rest/login/root/taosdata
```json
{
"status": "succ",
"head": ["ts", "current", "voltage", "phase"],
"column_meta": [
["ts", 9, 8],
["current", 6, 4],
["voltage", 4, 4],
["phase", 6, 4]
],
"data": [
["2018-10-03 14:38:05.000", 10.3, 219, 0.31],
["2018-10-03 14:38:15.000", 12.6, 218, 0.33]
],
"rows": 2
"code": 0,
"column_meta": [
[
"ts",
"TIMESTAMP",
8
],
[
"current",
"FLOAT",
4
],
[
"voltage",
"INT",
4
],
[
"phase",
"FLOAT",
4
]
],
"data": [
[
"2022-07-30T06:44:40.32Z",
10.3,
219,
0.31
],
[
"2022-07-30T06:44:41.32Z",
12.6,
218,
0.33
]
],
"rows": 2
}
```
......@@ -225,83 +524,23 @@ curl http://192.168.0.1:6041/rest/login/root/taosdata
```json
{
"status": "succ",
"head": ["affected_rows"],
"column_meta": [["affected_rows", 4, 4]],
"data": [[1]],
"rows": 1
"code": 0,
"column_meta": [
[
"affected_rows",
"INT",
4
]
],
"data": [
[
0
]
],
"rows": 1
}
```
## 其他用法
### 结果集采用 Unix 时间戳
HTTP 请求 URL 采用 `/rest/sqlt` 时,返回结果集的时间戳将采用 Unix 时间戳格式表示,例如
```bash
curl -L -H "Authorization: Basic cm9vdDp0YW9zZGF0YQ==" -d "select * from demo.d1001" 192.168.0.1:6041/rest/sqlt
```
返回结果:
## 参考
```json
{
"status": "succ",
"head": ["ts", "current", "voltage", "phase"],
"column_meta": [
["ts", 9, 8],
["current", 6, 4],
["voltage", 4, 4],
["phase", 6, 4]
],
"data": [
[1538548685000, 10.3, 219, 0.31],
[1538548695000, 12.6, 218, 0.33]
],
"rows": 2
}
```
### 结果集采用 UTC 时间字符串
HTTP 请求 URL 采用 `/rest/sqlutc` 时,返回结果集的时间戳将采用 UTC 时间字符串表示,例如
```bash
curl -L -H "Authorization: Basic cm9vdDp0YW9zZGF0YQ==" -d "select * from demo.t1" 192.168.0.1:6041/rest/sqlutc
```
返回值:
```json
{
"status": "succ",
"head": ["ts", "current", "voltage", "phase"],
"column_meta": [
["ts", 9, 8],
["current", 6, 4],
["voltage", 4, 4],
["phase", 6, 4]
],
"data": [
["2018-10-03T14:38:05.000+0800", 10.3, 219, 0.31],
["2018-10-03T14:38:15.000+0800", 12.6, 218, 0.33]
],
"rows": 2
}
```
## 重要配置项
下面仅列出一些与 RESTful 接口有关的配置参数,其他系统参数请看配置文件里的说明。
- 对外提供 RESTful 服务的端口号,默认绑定到 6041(实际取值是 serverPort + 11,因此可以通过修改 serverPort 参数的设置来修改)。
- httpMaxThreads: 启动的线程数量,默认为 2(2.0.17.0 版本开始,默认值改为 CPU 核数的一半向下取整)。
- restfulRowLimit: 返回结果集(JSON 格式)的最大条数,默认值为 10240。
- httpEnableCompress: 是否支持压缩,默认不支持,目前 TDengine 仅支持 gzip 压缩格式。
- httpDebugFlag: 日志开关,默认 131。131:仅错误和报警信息,135:调试信息,143:非常详细的调试信息。
- httpDbNameMandatory: 是否必须在 RESTful URL 中指定默认的数据库名。默认为 0,即关闭此检查。如果设置为 1,那么每个 RESTful URL 中都必须设置一个默认数据库名,否则无论此时执行的 SQL 语句是否需要指定数据库,都会返回一个执行错误,拒绝执行此 SQL 语句。
:::note
如果使用 taosd 提供的 REST API, 那么以上配置需要写在 taosd 的配置文件 taos.cfg 中。如果使用 taosAdapter 提供的 REST API, 那么需要参考 taosAdapter [对应的配置方法](/reference/taosadapter/)。
:::
[taosAdapter](/reference/taosadapter/)
......@@ -65,7 +65,7 @@ REST 连接支持所有能运行 Go 的平台。
### 使用 go get 安装
`go get -u github.com/taosdata/driver-go/v2@develop`
`go get -u github.com/taosdata/driver-go/v3@latest`
### 使用 go mod 管理
......@@ -80,7 +80,7 @@ REST 连接支持所有能运行 Go 的平台。
```go
import (
"database/sql"
_ "github.com/taosdata/driver-go/v2/taosSql"
_ "github.com/taosdata/driver-go/v3/taosSql"
)
```
......@@ -132,7 +132,7 @@ import (
"database/sql"
"fmt"
_ "github.com/taosdata/driver-go/v2/taosSql"
_ "github.com/taosdata/driver-go/v3/taosSql"
)
func main() {
......@@ -164,7 +164,7 @@ import (
"database/sql"
"fmt"
_ "github.com/taosdata/driver-go/v2/taosRestful"
_ "github.com/taosdata/driver-go/v3/taosRestful"
)
func main() {
......@@ -205,14 +205,14 @@ func main() {
### 更多示例程序
* [示例程序](https://github.com/taosdata/TDengine/tree/develop/examples/go)
* [示例程序](https://github.com/taosdata/driver-go/tree/3.0/examples)
* [视频教程](https://www.taosdata.com/blog/2020/11/11/1951.html)
## 使用限制
由于 REST 接口无状态所以 `use db` 语法不会生效,需要将 db 名称放到 SQL 语句中,如:`create table if not exists tb1 (ts timestamp, a int)`改为`create table if not exists test.tb1 (ts timestamp, a int)`否则将报错`[0x217] Database not specified or available`
也可以将 db 名称放到 DSN 中,将 `root:taosdata@http(localhost:6041)/` 改为 `root:taosdata@http(localhost:6041)/test`,此方法在 TDengine 2.4.0.5 版本的 taosAdapter 开始支持。当指定的 db 不存在时执行 `create database` 语句不会报错,而执行针对该 db 的其他查询或写入操作会报错。
也可以将 db 名称放到 DSN 中,将 `root:taosdata@http(localhost:6041)/` 改为 `root:taosdata@http(localhost:6041)/test`。当指定的 db 不存在时执行 `create database` 语句不会报错,而执行针对该 db 的其他查询或写入操作会报错。
完整示例如下:
......@@ -224,7 +224,7 @@ import (
"fmt"
"time"
_ "github.com/taosdata/driver-go/v2/taosRestful"
_ "github.com/taosdata/driver-go/v3/taosRestful"
)
func main() {
......@@ -266,35 +266,27 @@ func main() {
## 常见问题
1. 无法找到包 `github.com/taosdata/driver-go/v2/taosRestful`
`go.mod` require 块对`github.com/taosdata/driver-go/v2`的引用改为`github.com/taosdata/driver-go/v2 develop`,之后执行 `go mod tidy`
2. database/sql stmt(参数绑定)相关接口崩溃
1. database/sql stmt(参数绑定)相关接口崩溃
REST 不支持参数绑定相关接口,建议使用`db.Exec``db.Query`
3. 使用 `use db` 语句后执行其他语句报错 `[0x217] Database not specified or available`
2. 使用 `use db` 语句后执行其他语句报错 `[0x217] Database not specified or available`
REST 接口中 SQL 语句的执行无上下文关联,使用 `use db` 语句不会生效,解决办法见上方使用限制章节。
4. 使用 taosSql 不报错使用 taosRestful 报错 `[0x217] Database not specified or available`
3. 使用 taosSql 不报错使用 taosRestful 报错 `[0x217] Database not specified or available`
因为 REST 接口无状态,使用 `use db` 语句不会生效,解决办法见上方使用限制章节。
5. 升级 `github.com/taosdata/driver-go/v2/taosRestful`
`go.mod` 文件中对 `github.com/taosdata/driver-go/v2` 的引用改为 `github.com/taosdata/driver-go/v2 develop`,之后执行 `go mod tidy`
6. `readBufferSize` 参数调大后无明显效果
4. `readBufferSize` 参数调大后无明显效果
`readBufferSize` 调大后会减少获取结果时 `syscall` 的调用。如果查询结果的数据量不大,修改该参数不会带来明显提升,如果该参数修改过大,瓶颈会在解析 JSON 数据。如果需要优化查询速度,需要根据实际情况调整该值来达到查询效果最优。
7. `disableCompression` 参数设置为 `false` 时查询效率降低
5. `disableCompression` 参数设置为 `false` 时查询效率降低
`disableCompression` 参数设置为 `false` 时查询结果会使用 `gzip` 压缩后传输,拿到数据后要先进行 `gzip` 解压。
8. `go get` 命令无法获取包,或者获取包超时
6. `go get` 命令无法获取包,或者获取包超时
设置 Go 代理 `go env -w GOPROXY=https://goproxy.cn,direct`
......@@ -334,17 +326,33 @@ func main() {
#### 订阅
* `func (conn *Connector) Subscribe(restart bool, topic string, sql string, interval time.Duration) (Subscriber, error)`
* `func NewConsumer(conf *Config) (*Consumer, error)`
创建消费者。
* `func (c *Consumer) Subscribe(topics []string) error`
订阅主题。
订阅数据。
* `func (c *Consumer) Poll(timeout time.Duration) (*Result, error)`
* `func (s *taosSubscriber) Consume() (driver.Rows, error)`
轮询消息。
消费订阅数据,返回 `database/sql/driver` 包的 `Rows` 结构。
* `func (c *Consumer) Commit(ctx context.Context, message unsafe.Pointer) error`
* `func (s *taosSubscriber) Unsubscribe(keepProgress bool)`
提交消息。
取消订阅数据。
* `func (c *Consumer) FreeMessage(message unsafe.Pointer)`
释放消息。
* `func (c *Consumer) Unsubscribe() error`
取消订阅。
* `func (c *Consumer) Close() error`
关闭消费者。
#### schemaless
......@@ -366,10 +374,6 @@ func main() {
参数绑定单行插入。
* `func (conn *Connector) StmtQuery(sql string, params *param.Param) (rows driver.Rows, err error)`
参数绑定查询,返回 `database/sql/driver` 包的 `Rows` 结构。
* `func (conn *Connector) InsertStmt() *insertstmt.InsertStmt`
初始化参数。
......@@ -408,4 +412,4 @@ func main() {
## API 参考
全部 API [driver-go 文档](https://pkg.go.dev/github.com/taosdata/driver-go/v2)
全部 API [driver-go 文档](https://pkg.go.dev/github.com/taosdata/driver-go/v3)
......@@ -56,7 +56,6 @@ enum {
STREAM_INPUT__DATA_SUBMIT = 1,
STREAM_INPUT__DATA_BLOCK,
STREAM_INPUT__MERGED_SUBMIT,
// STREAM_INPUT__TABLE_SCAN,
STREAM_INPUT__TQ_SCAN,
STREAM_INPUT__DATA_RETRIEVE,
STREAM_INPUT__GET_RES,
......@@ -154,7 +153,7 @@ typedef struct SQueryTableDataCond {
int32_t order; // desc|asc order to iterate the data block
int32_t numOfCols;
SColumnInfo* colList;
int32_t type; // data block load type:
int32_t type; // data block load type:
STimeWindow twindows;
int64_t startVersion;
int64_t endVersion;
......
......@@ -202,6 +202,7 @@ bool fmIsForbidStreamFunc(int32_t funcId);
bool fmIsIntervalInterpoFunc(int32_t funcId);
bool fmIsInterpFunc(int32_t funcId);
bool fmIsLastRowFunc(int32_t funcId);
bool fmIsSelectValueFunc(int32_t funcId);
bool fmIsSystemInfoFunc(int32_t funcId);
bool fmIsImplicitTsFunc(int32_t funcId);
bool fmIsClientPseudoColumnFunc(int32_t funcId);
......
......@@ -283,6 +283,7 @@ typedef struct SCreateIndexStmt {
EIndexType indexType;
bool ignoreExists;
char indexName[TSDB_INDEX_NAME_LEN];
char dbName[TSDB_DB_NAME_LEN];
char tableName[TSDB_TABLE_NAME_LEN];
SNodeList* pCols;
SIndexOptions* pOptions;
......
......@@ -34,11 +34,16 @@ typedef struct SUpdateInfo {
TSKEY minTS;
SScalableBf* pCloseWinSBF;
SHashObj* pMap;
STimeWindow scanWindow;
uint64_t scanGroupId;
uint64_t maxVersion;
} SUpdateInfo;
SUpdateInfo *updateInfoInitP(SInterval* pInterval, int64_t watermark);
SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark);
bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts);
void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow* pWin, uint64_t groupId, uint64_t version);
bool updateInfoIgnore(SUpdateInfo *pInfo, STimeWindow* pWin, uint64_t groupId, uint64_t version);
void updateInfoDestroy(SUpdateInfo *pInfo);
void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo);
void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo);
......
......@@ -41,7 +41,7 @@ extern "C" {
#define WAL_REFRESH_MS 1000
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
#define WAL_MAGIC 0xFAFBFCFDULL
#define WAL_MAGIC 0xFAFBFCFDF4F3F2F1ULL
#define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3)
typedef enum {
......
......@@ -258,6 +258,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_SINGLE_STB_MODE_DB TAOS_DEF_ERROR_CODE(0, 0x03C5)
#define TSDB_CODE_MND_INVALID_SCHEMA_VER TAOS_DEF_ERROR_CODE(0, 0x03C6)
#define TSDB_CODE_MND_STABLE_UID_NOT_MATCH TAOS_DEF_ERROR_CODE(0, 0x03C7)
#define TSDB_CODE_MND_FIELD_CONFLICT_WITH_TSMA TAOS_DEF_ERROR_CODE(0, 0x03C8)
// mnode-trans
#define TSDB_CODE_MND_TRANS_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D0)
......
......@@ -316,6 +316,7 @@ void doDestroyRequest(void *p) {
taosArrayDestroy(pRequest->tableList);
taosArrayDestroy(pRequest->dbList);
taosArrayDestroy(pRequest->targetTableList);
destroyQueryExecRes(&pRequest->body.resInfo.execRes);
......
......@@ -1231,9 +1231,7 @@ int32_t copyDataBlock(SSDataBlock* dst, const SSDataBlock* src) {
colDataAssign(pDst, pSrc, src->info.rows, &src->info);
}
dst->info.rows = src->info.rows;
dst->info.window = src->info.window;
dst->info.type = src->info.type;
dst->info = src->info;
return TSDB_CODE_SUCCESS;
}
......@@ -1708,9 +1706,9 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
int32_t rows = pDataBlock->info.rows;
int32_t len = 0;
len += snprintf(dumpBuf + len, size - len, "===stream===%s |block type %d|child id %d|group id:%" PRIu64 "|uid:%ld|rows:%d\n", flag,
len += snprintf(dumpBuf + len, size - len, "===stream===%s|block type %d|child id %d|group id:%" PRIu64 "|uid:%ld|rows:%d|version:%" PRIu64 "\n", flag,
(int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.groupId,
pDataBlock->info.uid, pDataBlock->info.rows);
pDataBlock->info.uid, pDataBlock->info.rows, pDataBlock->info.version);
if (len >= size - 1) return dumpBuf;
for (int32_t j = 0; j < rows; j++) {
......
......@@ -162,10 +162,7 @@ int32_t tNameGetDbName(const SName* name, char* dst) {
return 0;
}
const char* tNameGetDbNameP(const SName* name) {
return &name->dbname[0];
}
const char* tNameGetDbNameP(const SName* name) { return &name->dbname[0]; }
int32_t tNameGetFullDbName(const SName* name, char* dst) {
assert(name != NULL && dst != NULL);
......@@ -212,7 +209,6 @@ int32_t tNameAddTbName(SName* dst, const char* tbName, size_t nameLen) {
return 0;
}
int32_t tNameSetAcctId(SName* dst, int32_t acctId) {
assert(dst != NULL);
dst->acctId = acctId;
......@@ -266,11 +262,21 @@ int32_t tNameFromString(SName* dst, const char* str, uint32_t type) {
char* start = (char*)((p == NULL) ? str : (p + 1));
int32_t len = 0;
p = strstr(start, TS_PATH_DELIMITER);
if (p == NULL) {
len = (int32_t)strlen(start);
if (TS_ESCAPE_CHAR == *start) {
++start;
char* end = start;
while ('`' != *end) {
++end;
}
len = end - start;
p = ++end;
} else {
len = (int32_t)(p - start);
p = strstr(start, TS_PATH_DELIMITER);
if (p == NULL) {
len = (int32_t)strlen(start);
} else {
len = (int32_t)(p - start);
}
}
// too long account id or too long db name
......@@ -288,6 +294,10 @@ int32_t tNameFromString(SName* dst, const char* str, uint32_t type) {
// too long account id or too long db name
int32_t len = (int32_t)strlen(start);
if (TS_ESCAPE_CHAR == *start) {
len -= 2;
++start;
}
if ((len >= tListLen(dst->tname)) || (len <= 0)) {
return -1;
}
......@@ -340,7 +350,7 @@ void buildChildTableName(RandTableName* rName) {
char temp[8] = {0};
rName->childTableName[0] = 't';
rName->childTableName[1] = '_';
for(int i = 0; i < 16; i++){
for (int i = 0; i < 16; i++) {
sprintf(temp, "%02x", context.digest[i]);
strcat(rName->childTableName, temp);
}
......
......@@ -37,8 +37,6 @@ const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]);
int32_t mndSetTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic);
int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char* stbname, int64_t suid, col_id_t colId);
#ifdef __cplusplus
}
#endif
......
......@@ -1639,10 +1639,10 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.walRetentionPeriod, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.walRollPeriod, false);
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.walRetentionSize, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.walRetentionSize, false);
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.walRollPeriod, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.walSegmentSize, false);
......
......@@ -416,7 +416,7 @@ int32_t mndStart(SMnode *pMnode) {
}
mndSetRestore(pMnode, true);
}
grantReset(pMnode, TSDB_GRANT_ALL, 0);
return mndInitTimer(pMnode);
......@@ -446,20 +446,6 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
return -1;
}
do {
char *syncNodeStr = sync2SimpleStr(pMgmt->sync);
static int64_t mndTick = 0;
if (++mndTick % 10 == 1) {
mTrace("vgId:%d, sync trace msg:%s, %s", syncGetVgId(pMgmt->sync), TMSG_INFO(pMsg->msgType), syncNodeStr);
}
if (gRaftDetailLog) {
char logBuf[512] = {0};
snprintf(logBuf, sizeof(logBuf), "==mndProcessSyncMsg== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
syncRpcMsgLog2(logBuf, pMsg);
}
taosMemoryFree(syncNodeStr);
} while (0);
// ToDo: ugly! use function pointer
if (syncNodeStrategy(pSyncNode) == SYNC_STRATEGY_STANDARD_SNAPSHOT) {
if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
......
......@@ -687,6 +687,7 @@ static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
int32_t numOfRows = 0;
int32_t cols = 0;
SConnObj *pConn = NULL;
int32_t keepTime = tsShellActivityTimer * 3;
if (pShow->pIter == NULL) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
......@@ -700,6 +701,10 @@ static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
break;
}
if ((taosGetTimestampMs() - pConn->lastAccessTimeMs) > (keepTime * 1000)) {
continue;
}
cols = 0;
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
......
......@@ -45,7 +45,9 @@ static int32_t mndProcessTableMetaReq(SRpcMsg *pReq);
static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextStb(SMnode *pMnode, void *pIter);
static int32_t mndProcessTableCfgReq(SRpcMsg *pReq);
static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp, void* alterOriData, int32_t alterOriDataLen);
static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp,
void *alterOriData, int32_t alterOriDataLen);
static int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char *stbname, int64_t suid, col_id_t colId);
int32_t mndInitStb(SMnode *pMnode) {
SSdbTable table = {
......@@ -409,7 +411,8 @@ static FORCE_INLINE int32_t schemaExColIdCompare(const void *colId, const void *
return 0;
}
static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen, void* alterOriData, int32_t alterOriDataLen) {
static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen,
void *alterOriData, int32_t alterOriDataLen) {
SEncoder encoder = {0};
int32_t contLen;
SName name = {0};
......@@ -709,7 +712,8 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat
memcpy(pDst->db, pDb->name, TSDB_DB_FNAME_LEN);
pDst->createdTime = taosGetTimestampMs();
pDst->updateTime = pDst->createdTime;
pDst->uid = (pCreate->source == TD_REQ_FROM_TAOX) ? pCreate->suid : mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
pDst->uid =
(pCreate->source == TD_REQ_FROM_TAOX) ? pCreate->suid : mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
pDst->dbUid = pDb->uid;
pDst->tagVer = 1;
pDst->colVer = 1;
......@@ -895,9 +899,9 @@ static int32_t mndBuildStbFromAlter(SStbObj *pStb, SStbObj *pDst, SMCreateStbReq
pSchema->flags = pField->flags;
memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
int32_t cIndex = mndFindSuperTableColumnIndex(pStb, pField->name);
if (cIndex >= 0){
if (cIndex >= 0) {
pSchema->colId = pStb->pColumns[cIndex].colId;
}else{
} else {
pSchema->colId = pDst->nextColId++;
}
}
......@@ -909,12 +913,11 @@ static int32_t mndBuildStbFromAlter(SStbObj *pStb, SStbObj *pDst, SMCreateStbReq
pSchema->bytes = pField->bytes;
memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
int32_t cIndex = mndFindSuperTableTagIndex(pStb, pField->name);
if (cIndex >= 0){
if (cIndex >= 0) {
pSchema->colId = pStb->pTags[cIndex].colId;
}else{
} else {
pSchema->colId = pDst->nextColId++;
}
}
pDst->tagVer = createReq->tagVer;
pDst->colVer = createReq->colVer;
......@@ -982,7 +985,7 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
}
} else if (terrno != TSDB_CODE_MND_STB_NOT_EXIST) {
goto _OVER;
} else if (createReq.source == TD_REQ_FROM_TAOX && (createReq.tagVer != 1 || createReq.colVer != 1)){
} else if (createReq.source == TD_REQ_FROM_TAOX && (createReq.tagVer != 1 || createReq.colVer != 1)) {
mInfo("stb:%s, alter table does not need to be done, because table is deleted", createReq.name);
code = 0;
goto _OVER;
......@@ -1009,7 +1012,7 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
}
if (isAlter) {
bool needRsp = false;
bool needRsp = false;
SStbObj pDst = {0};
if (mndBuildStbFromAlter(pStb, &pDst, &createReq) != 0) {
taosMemoryFreeClear(pDst.pTags);
......@@ -1137,6 +1140,99 @@ static int32_t mndAddSuperTableTag(const SStbObj *pOld, SStbObj *pNew, SArray *p
return 0;
}
int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char *stbname, int64_t suid, col_id_t colId) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
while (1) {
SMqTopicObj *pTopic = NULL;
pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
if (pIter == NULL) break;
mDebug("topic:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d, subType:%d sql:%s",
pTopic->name, stbname, suid, colId, pTopic->subType, pTopic->sql);
if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) {
sdbRelease(pSdb, pTopic);
continue;
}
SNode *pAst = NULL;
if (nodesStringToNode(pTopic->ast, &pAst) != 0) {
ASSERT(0);
return -1;
}
SNodeList *pNodeList = NULL;
nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList);
SNode *pNode = NULL;
FOREACH(pNode, pNodeList) {
SColumnNode *pCol = (SColumnNode *)pNode;
mDebug("topic:%s, check colId:%d tableId:%" PRId64 " ctbStbUid:%" PRId64, pTopic->name, pCol->colId,
pCol->tableId, pTopic->ctbStbUid);
if (pCol->tableId != suid && pTopic->ctbStbUid != suid) {
mDebug("topic:%s, check colId:%d passed", pTopic->name, pCol->colId);
goto NEXT;
}
if (pCol->colId > 0 && pCol->colId == colId) {
sdbRelease(pSdb, pTopic);
nodesDestroyNode(pAst);
terrno = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC;
mError("topic:%s, check colId:%d conflicted", pTopic->name, pCol->colId);
return -1;
}
mDebug("topic:%s, check colId:%d passed", pTopic->name, pCol->colId);
}
NEXT:
sdbRelease(pSdb, pTopic);
nodesDestroyNode(pAst);
}
while (1) {
SSmaObj *pSma = NULL;
pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
if (pIter == NULL) break;
mDebug("tsma:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d, sql:%s", pSma->name, stbname,
suid, colId, pSma->sql);
SNode *pAst = NULL;
if (nodesStringToNode(pSma->ast, &pAst) != 0) {
terrno = TSDB_CODE_SDB_INVALID_DATA_CONTENT;
mError("tsma:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d failed since parse AST err",
pSma->name, stbname, suid, colId);
return -1;
}
SNodeList *pNodeList = NULL;
nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList);
SNode *pNode = NULL;
FOREACH(pNode, pNodeList) {
SColumnNode *pCol = (SColumnNode *)pNode;
mDebug("tsma:%s, check colId:%d tableId:%" PRId64, pSma->name, pCol->colId, pCol->tableId);
if ((pCol->tableId != suid) && (pSma->stbUid != suid)) {
mDebug("tsma:%s, check colId:%d passed", pSma->name, pCol->colId);
goto NEXT2;
}
if ((pCol->colId) > 0 && (pCol->colId == colId)) {
sdbRelease(pSdb, pSma);
nodesDestroyNode(pAst);
terrno = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TSMA;
mError("tsma:%s, check colId:%d conflicted", pSma->name, pCol->colId);
return -1;
}
mDebug("tsma:%s, check colId:%d passed", pSma->name, pCol->colId);
}
NEXT2:
sdbRelease(pSdb, pSma);
nodesDestroyNode(pAst);
}
return 0;
}
static int32_t mndDropSuperTableTag(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, const char *tagName) {
int32_t tag = mndFindSuperTableTagIndex(pOld, tagName);
if (tag < 0) {
......@@ -1380,7 +1476,8 @@ static int32_t mndSetAlterStbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *
return 0;
}
static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb, void* alterOriData, int32_t alterOriDataLen) {
static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb, void *alterOriData,
int32_t alterOriDataLen) {
SSdb *pSdb = pMnode->pSdb;
SVgObj *pVgroup = NULL;
void *pIter = NULL;
......@@ -1607,7 +1704,8 @@ static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, SStbObj *pObj, void **pCont, i
return 0;
}
static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp, void* alterOriData, int32_t alterOriDataLen) {
static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp,
void *alterOriData, int32_t alterOriDataLen) {
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq);
if (pTrans == NULL) goto _OVER;
......@@ -2204,12 +2302,12 @@ static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)maxDelay, false);
char rollup[128 + VARSTR_HEADER_SIZE] = {0};
char rollup[128 + VARSTR_HEADER_SIZE] = {0};
int32_t rollupNum = (int32_t)taosArrayGetSize(pStb->pFuncs);
for (int32_t i = 0; i < rollupNum; ++i) {
char *funcName = taosArrayGet(pStb->pFuncs, i);
if (i) {
strcat(varDataVal(rollup), ", ");
strcat(varDataVal(rollup), ", ");
}
strcat(varDataVal(rollup), funcName);
}
......
......@@ -72,56 +72,6 @@ const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]) {
return strchr(topic, '.') + 1;
}
int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char* stbname, int64_t suid, col_id_t colId) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
while (1) {
SMqTopicObj *pTopic = NULL;
pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
if (pIter == NULL) break;
mDebug("topic:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d, subType:%d sql:%s",
pTopic->name, stbname, suid, colId, pTopic->subType, pTopic->sql);
if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) {
sdbRelease(pSdb, pTopic);
continue;
}
SNode *pAst = NULL;
if (nodesStringToNode(pTopic->ast, &pAst) != 0) {
ASSERT(0);
return -1;
}
SNodeList *pNodeList = NULL;
nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList);
SNode *pNode = NULL;
FOREACH(pNode, pNodeList) {
SColumnNode *pCol = (SColumnNode *)pNode;
mDebug("topic:%s, check colId:%d tableId:%" PRId64 " ctbStbUid:%" PRId64, pTopic->name, pCol->colId, pCol->tableId, pTopic->ctbStbUid);
if (pCol->tableId != suid && pTopic->ctbStbUid != suid) {
mDebug("topic:%s, check colId:%d passed", pTopic->name, pCol->colId);
goto NEXT;
}
if (pCol->colId > 0 && pCol->colId == colId) {
sdbRelease(pSdb, pTopic);
nodesDestroyNode(pAst);
terrno = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC;
mError("topic:%s, check colId:%d conflicted", pTopic->name, pCol->colId);
return -1;
}
mDebug("topic:%s, check colId:%d passed", pTopic->name, pCol->colId);
}
NEXT:
sdbRelease(pSdb, pTopic);
nodesDestroyNode(pAst);
}
return 0;
}
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......
......@@ -137,6 +137,7 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTa
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle);
void *tsdbGetIdx(SMeta *pMeta);
void *tsdbGetIvtIdx(SMeta *pMeta);
uint64_t getReaderMaxVersion(STsdbReader *pReader);
int32_t tsdbLastRowReaderOpen(void *pVnode, int32_t type, SArray *pTableIdList, int32_t numOfCols, void **pReader);
int32_t tsdbRetrieveLastRow(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids);
......
......@@ -78,7 +78,10 @@ struct SMeta {
TTB* pTagIdx;
TTB* pTtlIdx;
TTB* pSmaIdx;
TTB* pSmaIdx;
TTB* pTaskIdx;
SMetaIdx* pIdx;
};
......
......@@ -43,6 +43,7 @@ typedef struct STbDataIter STbDataIter;
typedef struct SMapData SMapData;
typedef struct SBlockIdx SBlockIdx;
typedef struct SBlock SBlock;
typedef struct SBlockL SBlockL;
typedef struct SColData SColData;
typedef struct SBlockDataHdr SBlockDataHdr;
typedef struct SBlockData SBlockData;
......@@ -414,6 +415,29 @@ struct SBlock {
SSubBlock aSubBlock[TSDB_MAX_SUBBLOCKS];
};
struct SBlockL {
struct {
int64_t uid;
int64_t version;
TSKEY ts;
} minKey;
struct {
int64_t uid;
int64_t version;
TSKEY ts;
} maxKey;
int64_t minVer;
int64_t maxVer;
int32_t nRow;
int8_t cmprAlg;
int64_t offset;
int32_t szBlock;
int32_t szBlockCol;
int32_t szUid;
int32_t szVer;
int32_t szTSKEY;
};
struct SColData {
int16_t cid;
int8_t type;
......
......@@ -153,7 +153,7 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data);
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data, int64_t ver);
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg);
......
......@@ -22,6 +22,7 @@ static int tagIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kL
static int ttlIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
static int uidIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
static int smaIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
static int taskIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
static int32_t metaInitLock(SMeta *pMeta) { return taosThreadRwlockInit(&pMeta->lock, NULL); }
static int32_t metaDestroyLock(SMeta *pMeta) { return taosThreadRwlockDestroy(&pMeta->lock); }
......@@ -130,6 +131,12 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
goto _err;
}
ret = tdbTbOpen("stream.task.db", sizeof(int64_t), -1, taskIdxKeyCmpr, pMeta->pEnv, &pMeta->pTaskIdx);
if (ret < 0) {
metaError("vgId: %d, failed to open meta stream task index since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
// open index
if (metaOpenIdx(pMeta) < 0) {
metaError("vgId:%d, failed to open meta index since %s", TD_VID(pVnode), tstrerror(terrno));
......@@ -143,6 +150,7 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
_err:
if (pMeta->pIdx) metaCloseIdx(pMeta);
if (pMeta->pTaskIdx) tdbTbClose(pMeta->pTaskIdx);
if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx);
if (pMeta->pTtlIdx) tdbTbClose(pMeta->pTtlIdx);
if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx);
......@@ -162,6 +170,7 @@ _err:
int metaClose(SMeta *pMeta) {
if (pMeta) {
if (pMeta->pIdx) metaCloseIdx(pMeta);
if (pMeta->pTaskIdx) tdbTbClose(pMeta->pTaskIdx);
if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx);
if (pMeta->pTtlIdx) tdbTbClose(pMeta->pTtlIdx);
if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx);
......@@ -378,3 +387,16 @@ static int smaIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kL
return 0;
}
static int taskIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) {
int32_t uid1 = *(int32_t *)pKey1;
int32_t uid2 = *(int32_t *)pKey2;
if (uid1 > uid2) {
return 1;
} else if (uid1 < uid2) {
return -1;
}
return 0;
}
......@@ -695,7 +695,7 @@ FAIL:
return -1;
}
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) {
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
void* pIter = NULL;
bool failed = false;
SStreamDataSubmit* pSubmit = NULL;
......@@ -713,7 +713,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) {
SStreamTask* pTask = *(SStreamTask**)pIter;
if (!pTask->isDataScan) continue;
qDebug("data submit enqueue stream task: %d", pTask->taskId);
qDebug("data submit enqueue stream task: %d, ver: %ld", pTask->taskId, ver);
if (!failed) {
if (streamTaskInput(pTask, (SStreamQueueItem*)pSubmit) < 0) {
......
......@@ -48,7 +48,7 @@ int32_t tqMetaOpen(STQ* pTq) {
ASSERT(0);
}
if (tdbTbOpen("handles", -1, -1, 0, pTq->pMetaStore, &pTq->pExecStore) < 0) {
if (tdbTbOpen("handles", -1, -1, NULL, pTq->pMetaStore, &pTq->pExecStore) < 0) {
ASSERT(0);
}
......
......@@ -252,7 +252,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
SSubmitReq* pReq = (SSubmitReq*)data;
pReq->version = ver;
tqProcessStreamTrigger(pTq, data);
tqProcessStreamTrigger(pTq, data, ver);
}
return 0;
......
......@@ -36,16 +36,20 @@ typedef struct {
TSKEY minKey;
TSKEY maxKey;
// commit file data
SDataFReader *pReader;
SArray *aBlockIdx; // SArray<SBlockIdx>
SMapData oBlockMap; // SMapData<SBlock>, read from reader
SBlockData oBlockData;
SDataFWriter *pWriter;
SArray *aBlockIdxN; // SArray<SBlockIdx>
SMapData nBlockMap; // SMapData<SBlock>
SBlockData nBlockData;
SSkmInfo skmTable;
SSkmInfo skmRow;
struct {
SDataFReader *pReader;
SArray *aBlockIdx; // SArray<SBlockIdx>
SMapData mBlock; // SMapData<SBlock>, read from reader
SBlockData bData;
} dReader;
struct {
SDataFWriter *pWriter;
SArray *aBlockIdx; // SArray<SBlockIdx>
SMapData mBlock; // SMapData<SBlock>
SBlockData bData;
} dWriter;
SSkmInfo skmTable;
SSkmInfo skmRow;
/* commit del */
SDelFReader *pDelFReader;
SDelFWriter *pDelFWriter;
......@@ -276,16 +280,16 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
pCommitter->nextKey = TSKEY_MAX;
// old
taosArrayClear(pCommitter->aBlockIdx);
tMapDataReset(&pCommitter->oBlockMap);
tBlockDataReset(&pCommitter->oBlockData);
taosArrayClear(pCommitter->dReader.aBlockIdx);
tMapDataReset(&pCommitter->dReader.mBlock);
tBlockDataReset(&pCommitter->dReader.bData);
pRSet = (SDFileSet *)taosArraySearch(pCommitter->fs.aDFileSet, &(SDFileSet){.fid = pCommitter->commitFid},
tDFileSetCmprFn, TD_EQ);
if (pRSet) {
code = tsdbDataFReaderOpen(&pCommitter->pReader, pTsdb, pRSet);
code = tsdbDataFReaderOpen(&pCommitter->dReader.pReader, pTsdb, pRSet);
if (code) goto _err;
code = tsdbReadBlockIdx(pCommitter->pReader, pCommitter->aBlockIdx, NULL);
code = tsdbReadBlockIdx(pCommitter->dReader.pReader, pCommitter->dReader.aBlockIdx, NULL);
if (code) goto _err;
}
......@@ -296,9 +300,9 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
SSmaFile fSma;
SDFileSet wSet = {.pHeadF = &fHead, .pDataF = &fData, .pLastF = &fLast, .pSmaF = &fSma};
taosArrayClear(pCommitter->aBlockIdxN);
tMapDataReset(&pCommitter->nBlockMap);
tBlockDataReset(&pCommitter->nBlockData);
taosArrayClear(pCommitter->dWriter.aBlockIdx);
tMapDataReset(&pCommitter->dWriter.mBlock);
tBlockDataReset(&pCommitter->dWriter.bData);
if (pRSet) {
wSet.diskId = pRSet->diskId;
wSet.fid = pCommitter->commitFid;
......@@ -320,7 +324,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
fLast = (SLastFile){.commitID = pCommitter->commitID, .size = 0};
fSma = (SSmaFile){.commitID = pCommitter->commitID, .size = 0};
}
code = tsdbDataFWriterOpen(&pCommitter->pWriter, pTsdb, &wSet);
code = tsdbDataFWriterOpen(&pCommitter->dWriter.pWriter, pTsdb, &wSet);
if (code) goto _err;
_exit:
......@@ -391,10 +395,11 @@ static int32_t tsdbCommitBlockData(SCommitter *pCommitter, SBlockData *pBlockDat
}
}
code = tsdbWriteBlockData(pCommitter->pWriter, pBlockData, NULL, NULL, pBlockIdx, pBlock, pCommitter->cmprAlg);
code =
tsdbWriteBlockData(pCommitter->dWriter.pWriter, pBlockData, NULL, NULL, pBlockIdx, pBlock, pCommitter->cmprAlg);
if (code) goto _err;
code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock);
code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pBlock, tPutBlock);
if (code) goto _err;
return code;
......@@ -407,8 +412,8 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB
int8_t toDataOnly) {
int32_t code = 0;
SBlockIdx *pBlockIdx = &(SBlockIdx){.suid = pIter->pTbData->suid, .uid = pIter->pTbData->uid};
SBlockData *pBlockDataMerge = &pCommitter->oBlockData;
SBlockData *pBlockData = &pCommitter->nBlockData;
SBlockData *pBlockDataMerge = &pCommitter->dReader.bData;
SBlockData *pBlockData = &pCommitter->dWriter.bData;
SBlock block;
SBlock *pBlock = &block;
TSDBROW *pRow1;
......@@ -416,7 +421,7 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB
TSDBROW *pRow2 = &row2;
// read SBlockData
code = tsdbReadBlockData(pCommitter->pReader, pBlockIdx, pBlockMerge, pBlockDataMerge, NULL, NULL);
code = tsdbReadBlockData(pCommitter->dReader.pReader, pBlockIdx, pBlockMerge, pBlockDataMerge, NULL, NULL);
if (code) goto _err;
code = tBlockDataSetSchema(pBlockData, pCommitter->skmTable.pTSchema);
......@@ -513,7 +518,7 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter
TSDBROW *pRow;
SBlock block;
SBlock *pBlock = &block;
SBlockData *pBlockData = &pCommitter->nBlockData;
SBlockData *pBlockData = &pCommitter->dWriter.bData;
int64_t suid = pIter->pTbData->suid;
int64_t uid = pIter->pTbData->uid;
......@@ -575,14 +580,14 @@ static int32_t tsdbCommitTableDiskData(SCommitter *pCommitter, SBlock *pBlock, S
SBlock block;
if (pBlock->last) {
code = tsdbReadBlockData(pCommitter->pReader, pBlockIdx, pBlock, &pCommitter->oBlockData, NULL, NULL);
code = tsdbReadBlockData(pCommitter->dReader.pReader, pBlockIdx, pBlock, &pCommitter->dReader.bData, NULL, NULL);
if (code) goto _err;
tBlockReset(&block);
code = tsdbCommitBlockData(pCommitter, &pCommitter->oBlockData, &block, pBlockIdx, 0);
code = tsdbCommitBlockData(pCommitter, &pCommitter->dReader.bData, &block, pBlockIdx, 0);
if (code) goto _err;
} else {
code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock);
code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pBlock, tPutBlock);
if (code) goto _err;
}
......@@ -598,10 +603,10 @@ static int32_t tsdbCommitTableDataEnd(SCommitter *pCommitter, int64_t suid, int6
SBlockIdx blockIdx = {.suid = suid, .uid = uid};
SBlockIdx *pBlockIdx = &blockIdx;
code = tsdbWriteBlock(pCommitter->pWriter, &pCommitter->nBlockMap, NULL, pBlockIdx);
code = tsdbWriteBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.mBlock, NULL, pBlockIdx);
if (code) goto _err;
if (taosArrayPush(pCommitter->aBlockIdxN, pBlockIdx) == NULL) {
if (taosArrayPush(pCommitter->dWriter.aBlockIdx, pBlockIdx) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
......@@ -643,7 +648,7 @@ static int32_t tsdbGetOvlpNRow(STbDataIter *pIter, SBlock *pBlock) {
static int32_t tsdbMergeAsSubBlock(SCommitter *pCommitter, STbDataIter *pIter, SBlock *pBlock) {
int32_t code = 0;
SBlockData *pBlockData = &pCommitter->nBlockData;
SBlockData *pBlockData = &pCommitter->dWriter.bData;
SBlockIdx *pBlockIdx = &(SBlockIdx){.suid = pIter->pTbData->suid, .uid = pIter->pTbData->uid};
SBlock block;
TSDBROW *pRow;
......@@ -711,10 +716,10 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
}
if (pBlockIdx) {
code = tsdbReadBlock(pCommitter->pReader, pBlockIdx, &pCommitter->oBlockMap, NULL);
code = tsdbReadBlock(pCommitter->dReader.pReader, pBlockIdx, &pCommitter->dReader.mBlock, NULL);
if (code) goto _err;
nBlock = pCommitter->oBlockMap.nItem;
nBlock = pCommitter->dReader.mBlock.nItem;
ASSERT(nBlock > 0);
suid = pBlockIdx->suid;
......@@ -726,13 +731,13 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
if (pRow == NULL && nBlock == 0) goto _exit;
// start ===========
tMapDataReset(&pCommitter->nBlockMap);
tMapDataReset(&pCommitter->dWriter.mBlock);
SBlock block;
SBlock *pBlock = &block;
iBlock = 0;
if (iBlock < nBlock) {
tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock);
} else {
pBlock = NULL;
}
......@@ -756,7 +761,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
iBlock++;
if (iBlock < nBlock) {
tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock);
} else {
pBlock = NULL;
}
......@@ -771,7 +776,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
iBlock++;
if (iBlock < nBlock) {
tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock);
} else {
pBlock = NULL;
}
......@@ -798,7 +803,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
SBlock nextBlock = {0};
tBlockReset(&nextBlock);
tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock + 1, &nextBlock, tGetBlock);
tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock + 1, &nextBlock, tGetBlock);
toKey = nextBlock.minKey;
}
......@@ -810,7 +815,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
iBlock++;
if (iBlock < nBlock) {
tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock);
} else {
pBlock = NULL;
}
......@@ -822,7 +827,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
iBlock++;
if (iBlock < nBlock) {
tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock);
} else {
pBlock = NULL;
}
......@@ -857,23 +862,23 @@ static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
int32_t code = 0;
// write blockIdx
code = tsdbWriteBlockIdx(pCommitter->pWriter, pCommitter->aBlockIdxN, NULL);
code = tsdbWriteBlockIdx(pCommitter->dWriter.pWriter, pCommitter->dWriter.aBlockIdx, NULL);
if (code) goto _err;
// update file header
code = tsdbUpdateDFileSetHeader(pCommitter->pWriter);
code = tsdbUpdateDFileSetHeader(pCommitter->dWriter.pWriter);
if (code) goto _err;
// upsert SDFileSet
code = tsdbFSUpsertFSet(&pCommitter->fs, &pCommitter->pWriter->wSet);
code = tsdbFSUpsertFSet(&pCommitter->fs, &pCommitter->dWriter.pWriter->wSet);
if (code) goto _err;
// close and sync
code = tsdbDataFWriterClose(&pCommitter->pWriter, 1);
code = tsdbDataFWriterClose(&pCommitter->dWriter.pWriter, 1);
if (code) goto _err;
if (pCommitter->pReader) {
code = tsdbDataFReaderClose(&pCommitter->pReader);
if (pCommitter->dReader.pReader) {
code = tsdbDataFReaderClose(&pCommitter->dReader.pReader);
if (code) goto _err;
}
......@@ -898,14 +903,14 @@ static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
int32_t iTbData = 0;
int32_t nTbData = taosArrayGetSize(pMemTable->aTbData);
int32_t iBlockIdx = 0;
int32_t nBlockIdx = taosArrayGetSize(pCommitter->aBlockIdx);
int32_t nBlockIdx = taosArrayGetSize(pCommitter->dReader.aBlockIdx);
STbData *pTbData;
SBlockIdx *pBlockIdx;
ASSERT(nTbData > 0);
pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->aBlockIdx, iBlockIdx) : NULL;
pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, iBlockIdx) : NULL;
while (pTbData || pBlockIdx) {
if (pTbData && pBlockIdx) {
int32_t c = tTABLEIDCmprFn(pTbData, pBlockIdx);
......@@ -936,7 +941,7 @@ static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
if (code) goto _err;
iBlockIdx++;
pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->aBlockIdx, iBlockIdx) : NULL;
pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, iBlockIdx) : NULL;
continue;
_commit_table_mem_and_disk:
......@@ -944,7 +949,7 @@ static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
if (code) goto _err;
iBlockIdx++;
pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->aBlockIdx, iBlockIdx) : NULL;
pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, iBlockIdx) : NULL;
iTbData++;
pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData) : NULL;
continue;
......@@ -958,8 +963,8 @@ static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
_err:
tsdbError("vgId:%d commit file data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
tsdbDataFReaderClose(&pCommitter->pReader);
tsdbDataFWriterClose(&pCommitter->pWriter, 0);
tsdbDataFReaderClose(&pCommitter->dReader.pReader);
tsdbDataFWriterClose(&pCommitter->dWriter.pWriter, 0);
return code;
}
......@@ -996,22 +1001,22 @@ _err:
static int32_t tsdbCommitDataStart(SCommitter *pCommitter) {
int32_t code = 0;
pCommitter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
if (pCommitter->aBlockIdx == NULL) {
pCommitter->dReader.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
if (pCommitter->dReader.aBlockIdx == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
pCommitter->aBlockIdxN = taosArrayInit(0, sizeof(SBlockIdx));
if (pCommitter->aBlockIdxN == NULL) {
pCommitter->dWriter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
if (pCommitter->dWriter.aBlockIdx == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
code = tBlockDataInit(&pCommitter->oBlockData);
code = tBlockDataInit(&pCommitter->dReader.bData);
if (code) goto _exit;
code = tBlockDataInit(&pCommitter->nBlockData);
code = tBlockDataInit(&pCommitter->dWriter.bData);
if (code) goto _exit;
_exit:
......@@ -1019,12 +1024,12 @@ _exit:
}
static void tsdbCommitDataEnd(SCommitter *pCommitter) {
taosArrayDestroy(pCommitter->aBlockIdx);
tMapDataClear(&pCommitter->oBlockMap);
tBlockDataClear(&pCommitter->oBlockData, 1);
taosArrayDestroy(pCommitter->aBlockIdxN);
tMapDataClear(&pCommitter->nBlockMap);
tBlockDataClear(&pCommitter->nBlockData, 1);
taosArrayDestroy(pCommitter->dReader.aBlockIdx);
tMapDataClear(&pCommitter->dReader.mBlock);
tBlockDataClear(&pCommitter->dReader.bData, 1);
taosArrayDestroy(pCommitter->dWriter.aBlockIdx);
tMapDataClear(&pCommitter->dWriter.mBlock);
tBlockDataClear(&pCommitter->dWriter.bData, 1);
tTSchemaDestroy(pCommitter->skmTable.pTSchema);
tTSchemaDestroy(pCommitter->skmRow.pTSchema);
}
......
......@@ -2502,6 +2502,10 @@ void* tsdbGetIvtIdx(SMeta* pMeta) {
return metaGetIvtIdx(pMeta);
}
uint64_t getReaderMaxVersion(STsdbReader *pReader) {
return pReader->verRange.maxVer;
}
/**
* @brief Get all suids since suid
*
......
......@@ -30,12 +30,20 @@ void ctgFreeMsgSendParam(void* param) {
taosMemoryFree(param);
}
void ctgFreeBatchMsg(void* msg) {
if (NULL == msg) {
return;
}
SBatchMsg* pMsg = (SBatchMsg*)msg;
taosMemoryFree(pMsg->msg);
}
void ctgFreeBatch(SCtgBatch *pBatch) {
if (NULL == pBatch) {
return;
}
taosArrayDestroy(pBatch->pMsgs);
taosArrayDestroyEx(pBatch->pMsgs, ctgFreeBatchMsg);
taosArrayDestroy(pBatch->pTaskIds);
}
......
......@@ -437,6 +437,7 @@ typedef struct SessionWindowSupporter {
SStreamAggSupporter* pStreamAggSup;
int64_t gap;
uint8_t parentType;
SAggSupporter* pIntervalAggSup;
} SessionWindowSupporter;
typedef struct STimeWindowSupp {
......@@ -1009,6 +1010,7 @@ int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs,
TSKEY* pEndTs, int32_t rows, int32_t start, int64_t gap, SHashObj* pStDeleted);
bool functionNeedToExecute(SqlFunctionCtx* pCtx);
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup);
bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup);
void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid);
void printDataBlock(SSDataBlock* pBlock, const char* flag);
......
......@@ -672,6 +672,10 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
numOfRows = pfCtx->fpSet.process(pfCtx);
} else if (fmIsAggFunc(pfCtx->functionId)) {
// selective value output should be set during corresponding function execution
if (fmIsSelectValueFunc(pfCtx->functionId)) {
continue;
}
// _group_key function for "partition by tbname" + csum(col_name) query
SColumnInfoData* pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId);
int32_t slotId = pfCtx->param[0].pCol->slotId;
......
......@@ -1131,7 +1131,8 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC);
// must check update info first.
bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, tsCol[rowId]);
if ((update || (isSignleIntervalWindow(pInfo) && isCloseWindow(&win, &pInfo->twAggSup))) && out) {
if ((update || (isSignleIntervalWindow(pInfo) && isCloseWindow(&win, &pInfo->twAggSup) &&
isDeletedWindow(&win, pBlock->info.groupId, pInfo->sessionSup.pIntervalAggSup))) && out) {
appendOneRow(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid);
}
}
......@@ -1337,6 +1338,9 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
case STREAM_SCAN_FROM_DATAREADER_RETRIEVE: {
SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
if (pSDB) {
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
uint64_t version = getReaderMaxVersion(pTableScanInfo->dataReader);
updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->cond.twindows, pInfo->groupId,version);
pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
checkUpdateData(pInfo, true, pSDB, false);
return pSDB;
......@@ -1390,6 +1394,12 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
setBlockIntoRes(pInfo, &block);
if (updateInfoIgnore(pInfo->pUpdateInfo, &pInfo->pRes->info.window, pInfo->pRes->info.groupId, pInfo->pRes->info.version)) {
printDataBlock(pInfo->pRes, "stream scan ignore");
blockDataCleanup(pInfo->pRes);
continue;
}
if (pBlockInfo->rows > 0) {
break;
}
......@@ -1406,6 +1416,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
// record the scan action.
pInfo->numOfExec++;
pOperator->resultInfo.totalRows += pBlockInfo->rows;
printDataBlock(pInfo->pRes, "stream scan");
if (pBlockInfo->rows == 0) {
updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
......
......@@ -1456,6 +1456,7 @@ static int32_t getAllIntervalWindow(SHashObj* pHashMap, SArray* resWins) {
static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, SInterval* pInterval,
SHashObj* pPullDataMap, SArray* closeWins, SArray* pRecyPages,
SDiskbasedBuf* pDiscBuf) {
qDebug("===stream===close interval window");
void* pIte = NULL;
size_t keyLen = 0;
while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) {
......@@ -1772,10 +1773,11 @@ SSDataBlock* createDeleteBlock() {
return pBlock;
}
void initIntervalDownStream(SOperatorInfo* downstream, uint8_t type) {
void initIntervalDownStream(SOperatorInfo* downstream, uint8_t type, SAggSupporter* pSup) {
ASSERT(downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
SStreamScanInfo* pScanInfo = downstream->info;
pScanInfo->sessionSup.parentType = type;
pScanInfo->sessionSup.pIntervalAggSup = pSup;
}
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
......@@ -1851,7 +1853,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
destroyIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
if (nodeType(pPhyNode) == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL) {
initIntervalDownStream(downstream, QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL);
initIntervalDownStream(downstream, QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, &pInfo->aggSup);
}
code = appendDownstream(pOperator, &downstream, 1);
......@@ -3111,7 +3113,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, NULL, destroyStreamFinalIntervalOperatorInfo,
aggEncodeResultRow, aggDecodeResultRow, NULL);
if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) {
initIntervalDownStream(downstream, pPhyNode->type);
initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup);
}
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
......
......@@ -2465,7 +2465,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "statecount",
.type = FUNCTION_TYPE_STATE_COUNT,
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC,
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC,
.translateFunc = translateStateCount,
.getEnvFunc = getStateFuncEnv,
.initFunc = functionSetup,
......@@ -2476,7 +2476,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "stateduration",
.type = FUNCTION_TYPE_STATE_DURATION,
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_FORBID_STREAM_FUNC,
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_FORBID_STREAM_FUNC,
.translateFunc = translateStateDuration,
.getEnvFunc = getStateFuncEnv,
.initFunc = functionSetup,
......@@ -2487,7 +2487,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "csum",
.type = FUNCTION_TYPE_CSUM,
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_CUMULATIVE_FUNC,
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_CUMULATIVE_FUNC,
.translateFunc = translateCsum,
.getEnvFunc = getCsumFuncEnv,
.initFunc = functionSetup,
......@@ -2499,7 +2499,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "mavg",
.type = FUNCTION_TYPE_MAVG,
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC,
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC,
.translateFunc = translateMavg,
.getEnvFunc = getMavgFuncEnv,
.initFunc = mavgFunctionSetup,
......
......@@ -4651,10 +4651,15 @@ int32_t stateCountFunction(SqlFunctionCtx* pCtx) {
numOfElems++;
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
colDataAppendNULL(pOutput, i);
// handle selectivity
if (pCtx->subsidiaries.num > 0) {
appendSelectivityValue(pCtx, i, i);
}
continue;
}
bool ret = checkStateOp(op, pInputCol, i, pCtx->param[2].param);
bool ret = checkStateOp(op, pInputCol, i, pCtx->param[2].param);
int64_t output = -1;
if (ret) {
output = ++pInfo->count;
......@@ -4662,6 +4667,11 @@ int32_t stateCountFunction(SqlFunctionCtx* pCtx) {
pInfo->count = 0;
}
colDataAppend(pOutput, i, (char*)&output, false);
// handle selectivity
if (pCtx->subsidiaries.num > 0) {
appendSelectivityValue(pCtx, i, i);
}
}
return numOfElems;
......@@ -4694,6 +4704,10 @@ int32_t stateDurationFunction(SqlFunctionCtx* pCtx) {
numOfElems++;
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
colDataAppendNULL(pOutput, i);
// handle selectivity
if (pCtx->subsidiaries.num > 0) {
appendSelectivityValue(pCtx, i, i);
}
continue;
}
......@@ -4710,6 +4724,11 @@ int32_t stateDurationFunction(SqlFunctionCtx* pCtx) {
pInfo->durationStart = 0;
}
colDataAppend(pOutput, i, (char*)&output, false);
// handle selectivity
if (pCtx->subsidiaries.num > 0) {
appendSelectivityValue(pCtx, i, i);
}
}
return numOfElems;
......@@ -4762,6 +4781,11 @@ int32_t csumFunction(SqlFunctionCtx* pCtx) {
}
}
// handle selectivity
if (pCtx->subsidiaries.num > 0) {
appendSelectivityValue(pCtx, i, pos);
}
numOfElems++;
}
......@@ -4834,6 +4858,11 @@ int32_t mavgFunction(SqlFunctionCtx* pCtx) {
colDataAppend(pOutput, pos, (char*)&result, false);
}
// handle selectivity
if (pCtx->subsidiaries.num > 0) {
appendSelectivityValue(pCtx, i, pos);
}
numOfElems++;
}
......
......@@ -221,6 +221,13 @@ bool fmIsLastRowFunc(int32_t funcId) {
return FUNCTION_TYPE_LAST_ROW == funcMgtBuiltins[funcId].type;
}
bool fmIsSelectValueFunc(int32_t funcId) {
if (funcId < 0 || funcId >= funcMgtBuiltinsNum) {
return false;
}
return FUNCTION_TYPE_SELECT_VALUE == funcMgtBuiltins[funcId].type;
}
void fmFuncMgtDestroy() {
void* m = gFunMgtService.pFuncNameHashTable;
if (m != NULL && atomic_val_compare_exchange_ptr((void**)&gFunMgtService.pFuncNameHashTable, m, 0) == m) {
......
......@@ -745,6 +745,7 @@ void nodesDestroyNode(SNode* pNode) {
}
taosArrayDestroy(pQuery->pDbList);
taosArrayDestroy(pQuery->pTableList);
taosArrayDestroy(pQuery->pTargetTableList);
taosArrayDestroy(pQuery->pPlaceholderValues);
nodesDestroyNode(pQuery->pPrepareRoot);
break;
......
......@@ -177,7 +177,7 @@ SNode* createCreateDnodeStmt(SAstCreateContext* pCxt, const SToken* pFqdn, const
SNode* createDropDnodeStmt(SAstCreateContext* pCxt, const SToken* pDnode);
SNode* createAlterDnodeStmt(SAstCreateContext* pCxt, const SToken* pDnode, const SToken* pConfig, const SToken* pValue);
SNode* createCreateIndexStmt(SAstCreateContext* pCxt, EIndexType type, bool ignoreExists, SToken* pIndexName,
SToken* pTableName, SNodeList* pCols, SNode* pOptions);
SNode* pRealTable, SNodeList* pCols, SNode* pOptions);
SNode* createIndexOption(SAstCreateContext* pCxt, SNodeList* pFuncs, SNode* pInterval, SNode* pOffset, SNode* pSliding,
SNode* pStreamOptions);
SNode* createDropIndexStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pIndexName);
......
......@@ -33,6 +33,8 @@
} else {
pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INCOMPLETE_SQL);
}
} else if (TSDB_CODE_PAR_DB_NOT_SPECIFIED == pCxt->errCode && TK_NK_FLOAT == TOKEN.type) {
pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR, TOKEN.z);
}
}
......@@ -422,9 +424,7 @@ from_db_opt(A) ::= FROM db_name(B).
/************************************************ create index ********************************************************/
cmd ::= CREATE SMA INDEX not_exists_opt(D)
index_name(A) ON table_name(B) index_options(C). { pCxt->pRootNode = createCreateIndexStmt(pCxt, INDEX_TYPE_SMA, D, &A, &B, NULL, C); }
//cmd ::= CREATE FULLTEXT INDEX not_exists_opt(D)
// index_name(A) ON table_name(B) NK_LP col_name_list(C) NK_RP. { pCxt->pRootNode = createCreateIndexStmt(pCxt, INDEX_TYPE_FULLTEXT, D, &A, &B, C, NULL); }
index_name(A) ON full_table_name(B) index_options(C). { pCxt->pRootNode = createCreateIndexStmt(pCxt, INDEX_TYPE_SMA, D, &A, B, NULL, C); }
cmd ::= DROP INDEX exists_opt(B) index_name(A). { pCxt->pRootNode = createDropIndexStmt(pCxt, B, &A); }
index_options(A) ::= FUNCTION NK_LP func_list(B) NK_RP INTERVAL
......
......@@ -1403,9 +1403,9 @@ SNode* createAlterDnodeStmt(SAstCreateContext* pCxt, const SToken* pDnode, const
}
SNode* createCreateIndexStmt(SAstCreateContext* pCxt, EIndexType type, bool ignoreExists, SToken* pIndexName,
SToken* pTableName, SNodeList* pCols, SNode* pOptions) {
SNode* pRealTable, SNodeList* pCols, SNode* pOptions) {
CHECK_PARSER_STATUS(pCxt);
if (!checkIndexName(pCxt, pIndexName) || !checkTableName(pCxt, pTableName) || !checkDbName(pCxt, NULL, true)) {
if (!checkIndexName(pCxt, pIndexName)) {
return NULL;
}
SCreateIndexStmt* pStmt = (SCreateIndexStmt*)nodesMakeNode(QUERY_NODE_CREATE_INDEX_STMT);
......@@ -1413,7 +1413,9 @@ SNode* createCreateIndexStmt(SAstCreateContext* pCxt, EIndexType type, bool igno
pStmt->indexType = type;
pStmt->ignoreExists = ignoreExists;
COPY_STRING_FORM_ID_TOKEN(pStmt->indexName, pIndexName);
COPY_STRING_FORM_ID_TOKEN(pStmt->tableName, pTableName);
strcpy(pStmt->dbName, ((SRealTableNode*)pRealTable)->table.dbName);
strcpy(pStmt->tableName, ((SRealTableNode*)pRealTable)->table.tableName);
nodesDestroyNode(pRealTable);
pStmt->pCols = pCols;
pStmt->pOptions = (SIndexOptions*)pOptions;
return (SNode*)pStmt;
......
......@@ -244,7 +244,10 @@ static int32_t collectMetaKeyFromDropTable(SCollectMetaKeyCxt* pCxt, SDropTableS
}
static int32_t collectMetaKeyFromAlterTable(SCollectMetaKeyCxt* pCxt, SAlterTableStmt* pStmt) {
int32_t code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache);
int32_t code = reserveDbCfgInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pCxt->pMetaCache);
if (TSDB_CODE_SUCCESS == code) {
code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache);
}
if (TSDB_CODE_SUCCESS == code) {
code = reserveTableVgroupInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache);
}
......@@ -252,7 +255,11 @@ static int32_t collectMetaKeyFromAlterTable(SCollectMetaKeyCxt* pCxt, SAlterTabl
}
static int32_t collectMetaKeyFromAlterStable(SCollectMetaKeyCxt* pCxt, SAlterTableStmt* pStmt) {
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache);
int32_t code = reserveDbCfgInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pCxt->pMetaCache);
if (TSDB_CODE_SUCCESS == code) {
code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache);
}
return code;
}
static int32_t collectMetaKeyFromUseDatabase(SCollectMetaKeyCxt* pCxt, SUseDatabaseStmt* pStmt) {
......
......@@ -434,7 +434,7 @@ static FORCE_INLINE int32_t checkAndTrimValue(SToken* pToken, char* tmpTokenBuf,
}
static bool isNullStr(SToken* pToken) {
return ((pToken->type == TK_NK_STRING) && (pToken->n != 0) &&
return ((pToken->type == TK_NK_STRING) && (strlen(TSDB_DATA_NULL_STR_L) == pToken->n) &&
(strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0));
}
......@@ -1175,11 +1175,6 @@ static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks,
getSTSRowAppendInfo(pBuilder->rowType, spd, i, &param.toffset, &param.colIdx);
CHECK_CODE(parseValueToken(&pCxt->pSql, &sToken, pSchema, timePrec, tmpTokenBuf, MemRowAppend, &param, &pCxt->msg));
if (PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
TSKEY tsKey = TD_ROW_KEY(row);
checkTimestamp(pDataBlocks, (const char*)&tsKey);
}
if (i < spd->numOfBound - 1) {
NEXT_VALID_TOKEN(pCxt->pSql, sToken);
if (TK_NK_COMMA != sToken.type) {
......@@ -1188,6 +1183,9 @@ static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks,
}
}
TSKEY tsKey = TD_ROW_KEY(row);
checkTimestamp(pDataBlocks, (const char*)&tsKey);
if (!isParseBindParam) {
// set the null value for the columns that do not assign values
if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) {
......
......@@ -398,6 +398,7 @@ static void destroyTranslateContext(STranslateContext* pCxt) {
taosHashCleanup(pCxt->pDbs);
taosHashCleanup(pCxt->pTables);
taosHashCleanup(pCxt->pTargetTables);
}
static bool isSelectStmt(SNode* pCurrStmt) {
......@@ -885,8 +886,7 @@ static EDealRes translateNormalValue(STranslateContext* pCxt, SValueNode* pVal,
}
case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY: {
if (strict && (!IS_VAR_DATA_TYPE(pVal->node.resType.type) ||
pVal->node.resType.bytes > targetDt.bytes - VARSTR_HEADER_SIZE)) {
if (strict && (pVal->node.resType.bytes > targetDt.bytes - VARSTR_HEADER_SIZE)) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, pVal->literal);
}
pVal->datum.p = taosMemoryCalloc(1, targetDt.bytes + 1);
......@@ -906,9 +906,6 @@ static EDealRes translateNormalValue(STranslateContext* pCxt, SValueNode* pVal,
break;
}
case TSDB_DATA_TYPE_NCHAR: {
if (strict && !IS_VAR_DATA_TYPE(pVal->node.resType.type)) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, pVal->literal);
}
pVal->datum.p = taosMemoryCalloc(1, targetDt.bytes + 1);
if (NULL == pVal->datum.p) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_OUT_OF_MEMORY);
......@@ -4126,7 +4123,7 @@ static SSchema* getTagSchema(STableMeta* pTableMeta, const char* pTagName) {
return NULL;
}
static int32_t checkAlterSuperTableImpl(STranslateContext* pCxt, SAlterTableStmt* pStmt, STableMeta* pTableMeta) {
static int32_t checkAlterSuperTableBySchema(STranslateContext* pCxt, SAlterTableStmt* pStmt, STableMeta* pTableMeta) {
SSchema* pTagsSchema = getTableTagSchema(pTableMeta);
if (getNumOfTags(pTableMeta) == 1 && pTagsSchema->type == TSDB_DATA_TYPE_JSON &&
(pStmt->alterType == TSDB_ALTER_TABLE_ADD_TAG || pStmt->alterType == TSDB_ALTER_TABLE_DROP_TAG ||
......@@ -4152,11 +4149,16 @@ static int32_t checkAlterSuperTableImpl(STranslateContext* pCxt, SAlterTableStmt
}
static int32_t checkAlterSuperTable(STranslateContext* pCxt, SAlterTableStmt* pStmt) {
if (TSDB_ALTER_TABLE_UPDATE_TAG_VAL == pStmt->alterType || TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME == pStmt->alterType) {
if (TSDB_ALTER_TABLE_UPDATE_TAG_VAL == pStmt->alterType) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ALTER_TABLE,
"Set tag value only available for child table");
}
if (TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME == pStmt->alterType) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ALTER_TABLE,
"Rename column only available for normal table");
}
if (pStmt->alterType == TSDB_ALTER_TABLE_UPDATE_OPTIONS && -1 != pStmt->pOptions->ttl) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ALTER_TABLE);
}
......@@ -4169,10 +4171,21 @@ static int32_t checkAlterSuperTable(STranslateContext* pCxt, SAlterTableStmt* pS
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COL_JSON);
}
SDbCfgInfo dbCfg = {0};
int32_t code = getDBCfg(pCxt, pStmt->dbName, &dbCfg);
if (TSDB_CODE_SUCCESS == code && NULL != dbCfg.pRetensions &&
(TSDB_ALTER_TABLE_ADD_COLUMN == pStmt->alterType || TSDB_ALTER_TABLE_DROP_COLUMN == pStmt->alterType ||
TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES == pStmt->alterType)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ALTER_TABLE,
"Modifying the table schema is not supported in databases "
"configured with the 'RETENTIONS' option");
}
STableMeta* pTableMeta = NULL;
int32_t code = getTableMeta(pCxt, pStmt->dbName, pStmt->tableName, &pTableMeta);
if (TSDB_CODE_SUCCESS == code) {
code = checkAlterSuperTableImpl(pCxt, pStmt, pTableMeta);
code = getTableMeta(pCxt, pStmt->dbName, pStmt->tableName, &pTableMeta);
}
if (TSDB_CODE_SUCCESS == code) {
code = checkAlterSuperTableBySchema(pCxt, pStmt, pTableMeta);
}
taosMemoryFree(pTableMeta);
return code;
......@@ -4310,9 +4323,8 @@ static int32_t getSmaIndexAst(STranslateContext* pCxt, SCreateIndexStmt* pStmt,
static int32_t buildCreateSmaReq(STranslateContext* pCxt, SCreateIndexStmt* pStmt, SMCreateSmaReq* pReq) {
SName name;
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pStmt->indexName, &name), pReq->name);
strcpy(name.tname, pStmt->tableName);
name.tname[strlen(pStmt->tableName)] = '\0';
tNameExtractFullName(&name, pReq->stb);
memset(&name, 0, sizeof(SName));
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &name), pReq->stb);
pReq->igExists = pStmt->ignoreExists;
pReq->interval = ((SValueNode*)pStmt->pOptions->pInterval)->datum.i;
pReq->intervalUnit = ((SValueNode*)pStmt->pOptions->pInterval)->unit;
......@@ -4390,7 +4402,7 @@ static int32_t translateCreateSmaIndex(STranslateContext* pCxt, SCreateIndexStmt
static int32_t buildCreateFullTextReq(STranslateContext* pCxt, SCreateIndexStmt* pStmt, SMCreateFullTextReq* pReq) {
// impl later
return 0;
return TSDB_CODE_SUCCESS;
}
static int32_t translateCreateFullTextIndex(STranslateContext* pCxt, SCreateIndexStmt* pStmt) {
......@@ -4404,12 +4416,10 @@ static int32_t translateCreateFullTextIndex(STranslateContext* pCxt, SCreateInde
}
static int32_t translateCreateIndex(STranslateContext* pCxt, SCreateIndexStmt* pStmt) {
if (INDEX_TYPE_SMA == pStmt->indexType) {
return translateCreateSmaIndex(pCxt, pStmt);
} else if (INDEX_TYPE_FULLTEXT == pStmt->indexType) {
if (INDEX_TYPE_FULLTEXT == pStmt->indexType) {
return translateCreateFullTextIndex(pCxt, pStmt);
}
return TSDB_CODE_FAILED;
return translateCreateSmaIndex(pCxt, pStmt);
}
static int32_t translateDropIndex(STranslateContext* pCxt, SDropIndexStmt* pStmt) {
......
......@@ -653,7 +653,7 @@ static int32_t reserveTableReqInCacheImpl(const char* pTbFName, int32_t len, SHa
static int32_t reserveTableReqInCache(int32_t acctId, const char* pDb, const char* pTable, SHashObj** pTables) {
char fullName[TSDB_TABLE_FNAME_LEN];
int32_t len = snprintf(fullName, sizeof(fullName), "%d.%s.%s", acctId, pDb, pTable);
int32_t len = snprintf(fullName, sizeof(fullName), "%d.`%s`.`%s`", acctId, pDb, pTable);
return reserveTableReqInCacheImpl(fullName, len, pTables);
}
......
此差异已折叠。
......@@ -2427,5 +2427,8 @@ static int32_t applyOptimizeRule(SPlanContext* pCxt, SLogicSubplan* pLogicSubpla
}
int32_t optimizeLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan) {
if (SUBPLAN_TYPE_MODIFY == pLogicSubplan->subplanType && NULL == pLogicSubplan->pNode->pChildren) {
return TSDB_CODE_SUCCESS;
}
return applyOptimizeRule(pCxt, pLogicSubplan);
}
......@@ -779,8 +779,10 @@ static SNode* stbSplCreateColumnNode(SExprNode* pExpr) {
strcpy(pCol->dbName, ((SColumnNode*)pExpr)->dbName);
strcpy(pCol->tableName, ((SColumnNode*)pExpr)->tableName);
strcpy(pCol->tableAlias, ((SColumnNode*)pExpr)->tableAlias);
strcpy(pCol->colName, ((SColumnNode*)pExpr)->colName);
} else {
strcpy(pCol->colName, pExpr->aliasName);
}
strcpy(pCol->colName, pExpr->aliasName);
strcpy(pCol->node.aliasName, pExpr->aliasName);
pCol->node.resType = pExpr->resType;
return (SNode*)pCol;
......
......@@ -42,9 +42,6 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo
if (TSDB_CODE_SUCCESS == code) {
code = createPhysiPlan(pCxt, pLogicPlan, pPlan, pExecNodeList);
}
if (TSDB_CODE_SUCCESS == code) {
dumpQueryPlan(*pPlan);
}
nodesDestroyNode((SNode*)pLogicSubplan);
nodesDestroyNode((SNode*)pLogicPlan);
......
......@@ -64,6 +64,12 @@ TEST_F(PlanSubqeuryTest, innerFill) {
"WHERE ts > '2022-04-06 00:00:00'");
}
TEST_F(PlanSubqeuryTest, innerOrderBy) {
useDb("root", "test");
run("SELECT c2 FROM (SELECT c2 FROM st1 ORDER BY c1, _rowts)");
}
TEST_F(PlanSubqeuryTest, outerInterval) {
useDb("root", "test");
......
此差异已折叠。
......@@ -140,13 +140,23 @@ int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type) {
SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
colDataDestroy(out.columnData);
taosMemoryFreeClear(out.columnData);
out.columnData = taosMemoryCalloc(1, sizeof(SColumnInfoData));
cell = cell->pNext;
}
*data = pObj;
colDataDestroy(out.columnData);
taosMemoryFreeClear(out.columnData);
return TSDB_CODE_SUCCESS;
_return:
colDataDestroy(out.columnData);
taosMemoryFreeClear(out.columnData);
taosHashCleanup(pObj);
SCL_RET(code);
}
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册