Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
530c3a55
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
530c3a55
编写于
8月 07, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of
https://github.com/taosdata/TDengine
into refact/tsdb_last
上级
85556a3c
ef849670
变更
25
隐藏空白更改
内联
并排
Showing
25 changed file
with
187 addition
and
100 deletion
+187
-100
docs/examples/go/connect/afconn/main.go
docs/examples/go/connect/afconn/main.go
+2
-1
docs/examples/go/connect/cgoexample/main.go
docs/examples/go/connect/cgoexample/main.go
+2
-1
docs/examples/go/connect/restexample/main.go
docs/examples/go/connect/restexample/main.go
+2
-1
docs/examples/go/connect/wrapper/main.go
docs/examples/go/connect/wrapper/main.go
+0
-17
docs/examples/go/insert/json/main.go
docs/examples/go/insert/json/main.go
+3
-3
docs/examples/go/insert/line/main.go
docs/examples/go/insert/line/main.go
+2
-1
docs/examples/go/insert/sql/main.go
docs/examples/go/insert/sql/main.go
+10
-12
docs/examples/go/insert/stmt/main.go
docs/examples/go/insert/stmt/main.go
+1
-1
docs/examples/go/insert/telnet/main.go
docs/examples/go/insert/telnet/main.go
+3
-3
docs/examples/go/query/sync/main.go
docs/examples/go/query/sync/main.go
+5
-7
docs/zh/07-develop/03-insert-data/_go_stmt.mdx
docs/zh/07-develop/03-insert-data/_go_stmt.mdx
+1
-1
docs/zh/07-develop/07-tmq.md
docs/zh/07-develop/07-tmq.md
+41
-36
docs/zh/14-reference/11-docker/index.md
docs/zh/14-reference/11-docker/index.md
+1
-1
include/libs/function/functionMgt.h
include/libs/function/functionMgt.h
+1
-0
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+13
-3
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+47
-2
source/libs/function/src/functionMgt.c
source/libs/function/src/functionMgt.c
+12
-0
tests/docs-examples-test/go.sh
tests/docs-examples-test/go.sh
+31
-0
tests/script/tsim/parser/function.sim
tests/script/tsim/parser/function.sim
+3
-3
tests/system-test/2-query/avg.py
tests/system-test/2-query/avg.py
+1
-1
tests/system-test/2-query/distribute_agg_apercentile.py
tests/system-test/2-query/distribute_agg_apercentile.py
+1
-1
tests/system-test/2-query/distribute_agg_max.py
tests/system-test/2-query/distribute_agg_max.py
+1
-1
tests/system-test/2-query/distribute_agg_min.py
tests/system-test/2-query/distribute_agg_min.py
+1
-1
tests/system-test/2-query/distribute_agg_spread.py
tests/system-test/2-query/distribute_agg_spread.py
+1
-1
tests/system-test/2-query/sample.py
tests/system-test/2-query/sample.py
+2
-2
未找到文件。
docs/examples/go/connect/afconn/main.go
浏览文件 @
530c3a55
...
...
@@ -2,6 +2,7 @@ package main
import
(
"fmt"
"log"
"github.com/taosdata/driver-go/v3/af"
)
...
...
@@ -10,7 +11,7 @@ func main() {
conn
,
err
:=
af
.
Open
(
"localhost"
,
"root"
,
"taosdata"
,
""
,
6030
)
defer
conn
.
Close
()
if
err
!=
nil
{
fmt
.
Print
ln
(
"failed to connect, err:"
,
err
)
log
.
Fatal
ln
(
"failed to connect, err:"
,
err
)
}
else
{
fmt
.
Println
(
"connected"
)
}
...
...
docs/examples/go/connect/cgoexample/main.go
浏览文件 @
530c3a55
...
...
@@ -3,6 +3,7 @@ package main
import
(
"database/sql"
"fmt"
"log"
_
"github.com/taosdata/driver-go/v3/taosSql"
)
...
...
@@ -11,7 +12,7 @@ func main() {
var
taosDSN
=
"root:taosdata@tcp(localhost:6030)/"
taos
,
err
:=
sql
.
Open
(
"taosSql"
,
taosDSN
)
if
err
!=
nil
{
fmt
.
Print
ln
(
"failed to connect TDengine, err:"
,
err
)
log
.
Fatal
ln
(
"failed to connect TDengine, err:"
,
err
)
return
}
fmt
.
Println
(
"Connected"
)
...
...
docs/examples/go/connect/restexample/main.go
浏览文件 @
530c3a55
...
...
@@ -3,6 +3,7 @@ package main
import
(
"database/sql"
"fmt"
"log"
_
"github.com/taosdata/driver-go/v3/taosRestful"
)
...
...
@@ -11,7 +12,7 @@ func main() {
var
taosDSN
=
"root:taosdata@http(localhost:6041)/"
taos
,
err
:=
sql
.
Open
(
"taosRestful"
,
taosDSN
)
if
err
!=
nil
{
fmt
.
Print
ln
(
"failed to connect TDengine, err:"
,
err
)
log
.
Fatal
ln
(
"failed to connect TDengine, err:"
,
err
)
return
}
fmt
.
Println
(
"Connected"
)
...
...
docs/examples/go/connect/wrapper/main.go
已删除
100644 → 0
浏览文件 @
85556a3c
package
main
import
(
"fmt"
"github.com/taosdata/driver-go/v3/wrapper"
)
func
main
()
{
conn
,
err
:=
wrapper
.
TaosConnect
(
"localhost"
,
"root"
,
"taosdata"
,
""
,
6030
)
defer
wrapper
.
TaosClose
(
conn
)
if
err
!=
nil
{
fmt
.
Println
(
"fail to connect, err:"
,
err
)
}
else
{
fmt
.
Println
(
"connected"
)
}
}
docs/examples/go/insert/json/main.go
浏览文件 @
530c3a55
package
main
import
(
"
fmt
"
"
log
"
"github.com/taosdata/driver-go/v3/af"
)
...
...
@@ -20,7 +20,7 @@ func prepareDatabase(conn *af.Connector) {
func
main
()
{
conn
,
err
:=
af
.
Open
(
"localhost"
,
"root"
,
"taosdata"
,
""
,
6030
)
if
err
!=
nil
{
fmt
.
Print
ln
(
"fail to connect, err:"
,
err
)
log
.
Fatal
ln
(
"fail to connect, err:"
,
err
)
}
defer
conn
.
Close
()
prepareDatabase
(
conn
)
...
...
@@ -32,6 +32,6 @@ func main() {
err
=
conn
.
OpenTSDBInsertJsonPayload
(
payload
)
if
err
!=
nil
{
fmt
.
Print
ln
(
"insert error:"
,
err
)
log
.
Fatal
ln
(
"insert error:"
,
err
)
}
}
docs/examples/go/insert/line/main.go
浏览文件 @
530c3a55
...
...
@@ -2,6 +2,7 @@ package main
import
(
"fmt"
"log"
"github.com/taosdata/driver-go/v3/af"
)
...
...
@@ -33,6 +34,6 @@ func main() {
err
=
conn
.
InfluxDBInsertLines
(
lines
,
"ms"
)
if
err
!=
nil
{
fmt
.
Print
ln
(
"insert error:"
,
err
)
log
.
Fatal
ln
(
"insert error:"
,
err
)
}
}
docs/examples/go/insert/sql/main.go
浏览文件 @
530c3a55
...
...
@@ -3,6 +3,7 @@ package main
import
(
"database/sql"
"fmt"
"log"
_
"github.com/taosdata/driver-go/v3/taosRestful"
)
...
...
@@ -10,28 +11,26 @@ import (
func
createStable
(
taos
*
sql
.
DB
)
{
_
,
err
:=
taos
.
Exec
(
"CREATE DATABASE power"
)
if
err
!=
nil
{
fmt
.
Print
ln
(
"failed to create database, err:"
,
err
)
log
.
Fatal
ln
(
"failed to create database, err:"
,
err
)
}
_
,
err
=
taos
.
Exec
(
"CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)"
)
if
err
!=
nil
{
fmt
.
Print
ln
(
"failed to create stable, err:"
,
err
)
log
.
Fatal
ln
(
"failed to create stable, err:"
,
err
)
}
}
func
insertData
(
taos
*
sql
.
DB
)
{
sql
:=
`INSERT INTO power.d1001 USING power.meters TAGS(
California.SanFrancisco
, 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000)
power.d1002 USING power.meters TAGS(
California.SanFrancisco
, 3) VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000)
power.d1003 USING power.meters TAGS(
California.LosAngeles
, 2) VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000)
power.d1004 USING power.meters TAGS(
California.LosAngeles
, 3) VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)`
sql
:=
`INSERT INTO power.d1001 USING power.meters TAGS(
'California.SanFrancisco'
, 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000)
power.d1002 USING power.meters TAGS(
'California.SanFrancisco'
, 3) VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000)
power.d1003 USING power.meters TAGS(
'California.LosAngeles'
, 2) VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000)
power.d1004 USING power.meters TAGS(
'California.LosAngeles'
, 3) VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)`
result
,
err
:=
taos
.
Exec
(
sql
)
if
err
!=
nil
{
fmt
.
Println
(
"failed to insert, err:"
,
err
)
return
log
.
Fatalln
(
"failed to insert, err:"
,
err
)
}
rowsAffected
,
err
:=
result
.
RowsAffected
()
if
err
!=
nil
{
fmt
.
Println
(
"failed to get affected rows, err:"
,
err
)
return
log
.
Fatalln
(
"failed to get affected rows, err:"
,
err
)
}
fmt
.
Println
(
"RowsAffected"
,
rowsAffected
)
}
...
...
@@ -40,8 +39,7 @@ func main() {
var
taosDSN
=
"root:taosdata@http(localhost:6041)/"
taos
,
err
:=
sql
.
Open
(
"taosRestful"
,
taosDSN
)
if
err
!=
nil
{
fmt
.
Println
(
"failed to connect TDengine, err:"
,
err
)
return
log
.
Fatalln
(
"failed to connect TDengine, err:"
,
err
)
}
defer
taos
.
Close
()
createStable
(
taos
)
...
...
docs/examples/go/insert/stmt/main.go
浏览文件 @
530c3a55
...
...
@@ -5,8 +5,8 @@ import (
"time"
"github.com/taosdata/driver-go/v3/af"
"github.com/taosdata/driver-go/v3/af/param"
"github.com/taosdata/driver-go/v3/common"
"github.com/taosdata/driver-go/v3/common/param"
)
func
checkErr
(
err
error
,
prompt
string
)
{
...
...
docs/examples/go/insert/telnet/main.go
浏览文件 @
530c3a55
package
main
import
(
"
fmt
"
"
log
"
"github.com/taosdata/driver-go/v3/af"
)
...
...
@@ -20,7 +20,7 @@ func prepareDatabase(conn *af.Connector) {
func
main
()
{
conn
,
err
:=
af
.
Open
(
"localhost"
,
"root"
,
"taosdata"
,
""
,
6030
)
if
err
!=
nil
{
fmt
.
Print
ln
(
"fail to connect, err:"
,
err
)
log
.
Fatal
ln
(
"fail to connect, err:"
,
err
)
}
defer
conn
.
Close
()
prepareDatabase
(
conn
)
...
...
@@ -37,6 +37,6 @@ func main() {
err
=
conn
.
OpenTSDBInsertTelnetLines
(
lines
)
if
err
!=
nil
{
fmt
.
Print
ln
(
"insert error:"
,
err
)
log
.
Fatal
ln
(
"insert error:"
,
err
)
}
}
docs/examples/go/query/sync/main.go
浏览文件 @
530c3a55
...
...
@@ -2,7 +2,7 @@ package main
import
(
"database/sql"
"
fmt
"
"
log
"
"time"
_
"github.com/taosdata/driver-go/v3/taosRestful"
...
...
@@ -12,14 +12,12 @@ func main() {
var
taosDSN
=
"root:taosdata@http(localhost:6041)/power"
taos
,
err
:=
sql
.
Open
(
"taosRestful"
,
taosDSN
)
if
err
!=
nil
{
fmt
.
Println
(
"failed to connect TDengine, err:"
,
err
)
return
log
.
Fatalln
(
"failed to connect TDengine, err:"
,
err
)
}
defer
taos
.
Close
()
rows
,
err
:=
taos
.
Query
(
"SELECT ts, current FROM meters LIMIT 2"
)
if
err
!=
nil
{
fmt
.
Println
(
"failed to select from table, err:"
,
err
)
return
log
.
Fatalln
(
"failed to select from table, err:"
,
err
)
}
defer
rows
.
Close
()
...
...
@@ -30,9 +28,9 @@ func main() {
}
err
:=
rows
.
Scan
(
&
r
.
ts
,
&
r
.
current
)
if
err
!=
nil
{
fmt
.
Print
ln
(
"scan error:
\n
"
,
err
)
log
.
Fatal
ln
(
"scan error:
\n
"
,
err
)
return
}
fmt
.
Print
ln
(
r
.
ts
,
r
.
current
)
log
.
Fatal
ln
(
r
.
ts
,
r
.
current
)
}
}
docs/zh/07-develop/03-insert-data/_go_stmt.mdx
浏览文件 @
530c3a55
...
...
@@ -3,6 +3,6 @@
```
:::tip
driver-go 的模块 `github.com/taosdata/driver-go/v
2
/wrapper` 是 C 接口的底层封装。使用这个模块也可以实现参数绑定写入。
driver-go 的模块 `github.com/taosdata/driver-go/v
3
/wrapper` 是 C 接口的底层封装。使用这个模块也可以实现参数绑定写入。
:::
docs/zh/07-develop/07-tmq.md
浏览文件 @
530c3a55
---
sidebar_label
:
消息队列
description
:
"
数据订阅与推送服务。
连续
写入到
TDengine
中的时序数据能够被自动推送到订阅客户端。"
title
:
消息队列
sidebar_label
:
数据订阅
description
:
"
数据订阅与推送服务。写入到
TDengine
中的时序数据能够被自动推送到订阅客户端。"
title
:
数据订阅
---
基于数据天然的时间序列特性,TDengine 的数据写入(insert)与消息系统的数据发布(pub)逻辑上一致,均可视为系统中插入一条带时间戳的新记录。同时,TDengine 在内部严格按照数据时间序列单调递增的方式保存数据。本质上来说,TDengine 中每一张表均可视为一个标准的消息队列
。
为了帮助应用实时获取写入 TDengine 的数据,或者以事件到达顺序处理数据,TDengine提供了类似消息队列产品的数据订阅、消费接口。这样在很多场景下,采用 TDengine 的时序数据处理系统不再需要集成消息队列产品,比如 kafka, 从而简化系统设计的复杂度,降低运营维护成本
。
TDengine 内嵌支持消息订阅与推送服务(下文都简称TMQ)。使用系统提供的 API,用户可使用普通查询语句订阅数据库中的一张或多张表,或整个库。客户端启动订阅后,定时或按需轮询服务器是否有新的记录到达,有新的记录到达就会将结果反馈到客户
。
与 kafka 一样,你需要定义 topic, 但 TDengine 的 topic 可以是一张超级表,或一张子表。不仅如此,你可以通过标签、表名、列、表达式等多种方法过滤所需数据,并且支持对数据进行函数变换、预处理(包括标量udf计算)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤交给 TDengine,而不是应用完成,有效的减少传输的数据量
。
TMQ提供了提交机制来保证消息队列的可靠性和正确性。在调用方法上,支持自动提交和手动提交。
消费者订阅 topic 后,可以实时获得最新的数据。多个消费者可以组成一个消费者组 (consumer group), 一个消费者组里的多个消费者共享消费进度,便于多线程分布式的消费数据,提高数据通吐率。但不同消费者组即使消费同一个topic, 并不共享消费进度。一个消费者组可以订阅多个 topic。如果订阅的是超级表,数据可能会分布在多个不同的vnode上,也就是多个shard上,这样一个消费组里有多个消费者可以提高消费效率。TDengine 的消息队列提供了消息的ACK机制,在宕机、重启等复杂环境下确保at least once消费。
为了实现上述功能,TDengine 采用了灵活的 WAL (Write-Ahead-Log) 文件切换与保留机制:可以按照时间或文件大小来保留WAL文件(详见create database语句)。在消费时,TDengine 从 WAL 中获取数据,并经过过滤、变换等操作,将数据推送给消费者。
本文档不对消息队列本身的基础知识做介绍,如果需要了解,请自行搜索。
## 主要数据结构和API
TMQ 的 API 中,与订阅相关的主要数据结构和API如下:
...
...
@@ -47,7 +53,9 @@ DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_comm
这些 API 的文档请见
[
C/C++ Connector
](
/reference/connector/cpp
)
,下面介绍一下它们的具体用法(超级表和子表结构请参考“数据建模”一节),完整的示例代码可以在
[
tmq.c
](
https://github.com/taosdata/TDengine/blob/3.0/examples/c/tmq.c
)
看到。
一、首先完成建库、建一张超级表和多张子表,并每个子表插入若干条数据记录:
## 写入数据
首先完成建库、建一张超级表和多张子表操作,然后就可以写入数据了,比如:
```
sql
drop
database
if
exists
tmqdb
;
...
...
@@ -63,14 +71,15 @@ insert into tmqdb.ctb2 values(now, 2, 2, 'a1')(now+1s, 22, 22, 'a22');
insert
into
tmqdb
.
ctb3
values
(
now
,
3
,
3
,
'a1'
)(
now
+
1
s
,
33
,
33
,
'a33'
);
```
二、
创建topic:
##
创建topic:
```
sql
create
topic
topicName
as
select
ts
,
c1
,
c2
,
c3
from
tmqdb
.
stb
where
c1
>
1
;
```
注:TMQ支持多种订阅类型:
1、列订阅
TMQ支持多种订阅类型:
### 列订阅
语法:CREATE TOPIC topic_name as subquery
通过select语句订阅(包括select
*
,或select ts, c1等指定列描述订阅,可以带条件过滤、标量函数计算,但不支持聚合函数、不支持时间窗口聚合)
...
...
@@ -79,25 +88,18 @@ create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1;
-
被订阅或用于计算的column和tag不可被删除、修改
-
若发生schema变更,新增的column不出现在结果中
2、
超级表订阅
###
超级表订阅
语法:CREATE TOPIC topic_name AS STABLE stbName
-
订阅某超级表的全部数据,schema变更不受限,schema变更后写入的数据将以最新schema返回
-
在tmq的返回消息中schema是块级别的,每块的schema可能不一样
-
列变更后写入的数据若未落盘,将以写入时的schema返回
-
列变更后写入的数据若已落盘,将以落盘时的schema返回
3、db订阅
语法:CREATE TOPIC topic_name AS DATABASE db_name
-
订阅某一db的全部数据,schema变更不受限
-
在tmq的返回消息中schema是块级别的,每块的schema可能不一样
-
列变更后写入的数据若未落盘,将以写入时的schema返回
-
列变更后写入的数据若已落盘,将以落盘时的schema返回
与select
*
from stbName订阅的区别是:
-
不会限制用户的schema变更
-
返回的是非结构化的数据:返回数据的schema会随之超级表的schema变化而变化
-
用户对于要处理的每一个数据块都可能有不同的schema,因此,必须重新获取schema
-
返回数据不带有tag
三、创建consumer
## 创建 consumer 以及consumer group
目前支持的config
:
对于consumer, 目前支持的config包括
:
| 参数名称 | 参数值 | 备注 |
| ---------------------------- | ------------------------------ | ------------------------------------------------------ |
...
...
@@ -121,7 +123,7 @@ create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1;
tmq_conf_set
(
conf
,
"group.id"
,
"cgrpName"
);
tmq_conf_set
(
conf
,
"td.connect.user"
,
"root"
);
tmq_conf_set
(
conf
,
"td.connect.pass"
,
"taosdata"
);
tmq_conf_set
(
conf
,
"auto.offset.reset"
,
"earliest"
);
tmq_conf_set
(
conf
,
"auto.offset.reset"
,
"earliest"
);
tmq_conf_set
(
conf
,
"experimental.snapshot.enable"
,
"true"
);
tmq_conf_set
(
conf
,
"msg.with.table.name"
,
"true"
);
tmq_conf_set_auto_commit_cb
(
conf
,
tmq_commit_cb_print
,
NULL
);
...
...
@@ -131,7 +133,12 @@ create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1;
return
tmq
;
```
四、创建订阅主题列表
上述配置中包括consumer group ID,如果多个 consumer 指定的 consumer group ID一样,则自动形成一个consumer group,共享消费进度。
## 创建 topic 列表
单个consumer支持同时订阅多个topic。
```
sql
tmq_list_t
*
topicList
=
tmq_list_new
();
...
...
@@ -139,9 +146,7 @@ create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1;
return
topicList
;
```
单个consumer支持同时订阅多个topic。
五、启动订阅并开始消费
## 启动订阅并开始消费
```
sql
/* 启动订阅 */
...
...
@@ -151,9 +156,9 @@ create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1;
/* 循环poll消息 */
int32_t
totalRows
=
0
;
int32_t
msgCnt
=
0
;
int32_t
consumeDelay
=
5000
;
int32_t
timeOut
=
5000
;
while
(
running
)
{
TAOS_RES
*
tmqmsg
=
tmq_consumer_poll
(
tmq
,
consumeDelay
);
TAOS_RES
*
tmqmsg
=
tmq_consumer_poll
(
tmq
,
timeOut
);
if
(
tmqmsg
)
{
msgCnt
++
;
totalRows
+=
msg_process
(
tmqmsg
);
...
...
@@ -190,7 +195,7 @@ create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1;
int32_t
*
length
=
taos_fetch_lengths
(
msg
);
int32_t
precision
=
taos_result_precision
(
msg
);
const
char
*
tbName
=
tmq_get_table_name
(
msg
);
rows
++
;
rows
++
;
taos_print_row
(
buf
,
row
,
fields
,
numOfFields
);
printf
(
"row content from %s: %s
\n
"
,
(
tbName
!=
NULL
?
tbName
:
"null table"
),
buf
);
}
...
...
@@ -199,7 +204,7 @@ create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1;
}
```
五、
结束消费
##
结束消费
```
sql
/* 取消订阅 */
...
...
@@ -209,7 +214,7 @@ create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1;
tmq_consumer_close
(
tmq
);
```
六、
删除topic
##
删除topic
如果不再需要,可以删除创建topic,但注意:只有没有被订阅的topic才能别删除。
...
...
@@ -218,7 +223,7 @@ create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1;
drop
topic
topicName
;
```
七、
状态查看
##
状态查看
1、topics:查询已经创建的topic
...
...
docs/zh/14-reference/11-docker/index.md
浏览文件 @
530c3a55
...
...
@@ -147,7 +147,7 @@ import (
"fmt"
"time"
_
"github.com/taosdata/driver-go/v
2
/taosSql"
_
"github.com/taosdata/driver-go/v
3
/taosSql"
)
type
config
struct
{
...
...
include/libs/function/functionMgt.h
浏览文件 @
530c3a55
...
...
@@ -202,6 +202,7 @@ bool fmIsForbidStreamFunc(int32_t funcId);
bool
fmIsIntervalInterpoFunc
(
int32_t
funcId
);
bool
fmIsInterpFunc
(
int32_t
funcId
);
bool
fmIsLastRowFunc
(
int32_t
funcId
);
bool
fmIsReturnNotNullFunc
(
int32_t
funcId
);
bool
fmIsSelectValueFunc
(
int32_t
funcId
);
bool
fmIsSystemInfoFunc
(
int32_t
funcId
);
bool
fmIsImplicitTsFunc
(
int32_t
funcId
);
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
530c3a55
...
...
@@ -1437,7 +1437,8 @@ static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, u
pAggInfo
->
groupId
=
groupId
;
}
static
void
doUpdateNumOfRows
(
SResultRow
*
pRow
,
int32_t
numOfExprs
,
const
int32_t
*
rowCellOffset
)
{
static
void
doUpdateNumOfRows
(
SqlFunctionCtx
*
pCtx
,
SResultRow
*
pRow
,
int32_t
numOfExprs
,
const
int32_t
*
rowCellOffset
)
{
bool
returnNotNull
=
false
;
for
(
int32_t
j
=
0
;
j
<
numOfExprs
;
++
j
)
{
struct
SResultRowEntryInfo
*
pResInfo
=
getResultEntryInfo
(
pRow
,
j
,
rowCellOffset
);
if
(
!
isRowEntryInitialized
(
pResInfo
))
{
...
...
@@ -1447,6 +1448,15 @@ static void doUpdateNumOfRows(SResultRow* pRow, int32_t numOfExprs, const int32_
if
(
pRow
->
numOfRows
<
pResInfo
->
numOfRes
)
{
pRow
->
numOfRows
=
pResInfo
->
numOfRes
;
}
if
(
fmIsReturnNotNullFunc
(
pCtx
[
j
].
functionId
))
{
returnNotNull
=
true
;
}
}
// if all expr skips all blocks, e.g. all null inputs for max function, output one row in final result.
// except for first/last, which require not null output, output no rows
if
(
pRow
->
numOfRows
==
0
&&
!
returnNotNull
)
{
pRow
->
numOfRows
=
1
;
}
}
...
...
@@ -1458,7 +1468,7 @@ int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosi
SFilePage
*
page
=
getBufPage
(
pBuf
,
resultRowPosition
->
pageId
);
SResultRow
*
pRow
=
(
SResultRow
*
)((
char
*
)
page
+
resultRowPosition
->
offset
);
doUpdateNumOfRows
(
pRow
,
numOfExprs
,
rowCellOffset
);
doUpdateNumOfRows
(
p
Ctx
,
p
Row
,
numOfExprs
,
rowCellOffset
);
if
(
pRow
->
numOfRows
==
0
)
{
releaseBufPage
(
pBuf
,
page
);
return
0
;
...
...
@@ -1514,7 +1524,7 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI
SResultRow
*
pRow
=
(
SResultRow
*
)((
char
*
)
page
+
pPos
->
pos
.
offset
);
doUpdateNumOfRows
(
pRow
,
numOfExprs
,
rowCellOffset
);
doUpdateNumOfRows
(
p
Ctx
,
p
Row
,
numOfExprs
,
rowCellOffset
);
if
(
pRow
->
numOfRows
==
0
)
{
pGroupResInfo
->
index
+=
1
;
releaseBufPage
(
pBuf
,
page
);
...
...
source/libs/function/src/builtinsimpl.c
浏览文件 @
530c3a55
...
...
@@ -64,6 +64,9 @@ typedef struct SMinmaxResInfo {
bool
assign
;
// assign the first value or not
int64_t
v
;
STuplePos
tuplePos
;
STuplePos
nullTuplePos
;
bool
nullTupleSaved
;
}
SMinmaxResInfo
;
typedef
struct
STopBotResItem
{
...
...
@@ -75,6 +78,10 @@ typedef struct STopBotResItem {
typedef
struct
STopBotRes
{
int32_t
maxSize
;
int16_t
type
;
STuplePos
nullTuplePos
;
bool
nullTupleSaved
;
STopBotResItem
*
pItems
;
}
STopBotRes
;
...
...
@@ -221,6 +228,10 @@ typedef struct SSampleInfo {
int32_t
numSampled
;
uint8_t
colType
;
int16_t
colBytes
;
STuplePos
nullTuplePos
;
bool
nullTupleSaved
;
char
*
data
;
STuplePos
*
tuplePos
;
}
SSampleInfo
;
...
...
@@ -1134,6 +1145,9 @@ bool minmaxFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo)
SMinmaxResInfo
*
buf
=
GET_ROWCELL_INTERBUF
(
pResultInfo
);
buf
->
assign
=
false
;
buf
->
tuplePos
.
pageId
=
-
1
;
buf
->
nullTupleSaved
=
false
;
buf
->
nullTuplePos
.
pageId
=
-
1
;
return
true
;
}
...
...
@@ -1575,6 +1589,10 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
}
_min_max_over:
if
(
numOfElems
==
0
&&
pCtx
->
subsidiaries
.
num
>
0
&&
!
pBuf
->
nullTupleSaved
)
{
doSaveTupleData
(
pCtx
,
pInput
->
startRowIndex
,
pCtx
->
pSrcBlock
,
&
pBuf
->
nullTuplePos
);
pBuf
->
nullTupleSaved
=
true
;
}
return
numOfElems
;
}
...
...
@@ -1615,7 +1633,7 @@ int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
if
(
pEntryInfo
->
numOfRes
>
0
)
{
setSelectivityValue
(
pCtx
,
pBlock
,
&
pRes
->
tuplePos
,
currentRow
);
}
else
{
set
NullSelectivityValue
(
pCtx
,
pBlock
,
currentRow
);
set
SelectivityValue
(
pCtx
,
pBlock
,
&
pRes
->
nullTuplePos
,
currentRow
);
}
return
pEntryInfo
->
numOfRes
;
...
...
@@ -3366,6 +3384,8 @@ bool topBotFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
pRes
->
maxSize
=
pCtx
->
param
[
1
].
param
.
i
;
pRes
->
nullTupleSaved
=
false
;
pRes
->
nullTuplePos
.
pageId
=
-
1
;
return
true
;
}
...
...
@@ -3403,6 +3423,10 @@ int32_t topFunction(SqlFunctionCtx* pCtx) {
doAddIntoResult
(
pCtx
,
data
,
i
,
pCtx
->
pSrcBlock
,
pRes
->
type
,
pInput
->
uid
,
pResInfo
,
true
);
}
if
(
numOfElems
==
0
&&
pCtx
->
subsidiaries
.
num
>
0
&&
!
pRes
->
nullTupleSaved
)
{
doSaveTupleData
(
pCtx
,
pInput
->
startRowIndex
,
pCtx
->
pSrcBlock
,
&
pRes
->
nullTuplePos
);
pRes
->
nullTupleSaved
=
true
;
}
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -3427,6 +3451,11 @@ int32_t bottomFunction(SqlFunctionCtx* pCtx) {
doAddIntoResult
(
pCtx
,
data
,
i
,
pCtx
->
pSrcBlock
,
pRes
->
type
,
pInput
->
uid
,
pResInfo
,
false
);
}
if
(
numOfElems
==
0
&&
pCtx
->
subsidiaries
.
num
>
0
&&
!
pRes
->
nullTupleSaved
)
{
doSaveTupleData
(
pCtx
,
pInput
->
startRowIndex
,
pCtx
->
pSrcBlock
,
&
pRes
->
nullTuplePos
);
pRes
->
nullTupleSaved
=
true
;
}
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -3625,6 +3654,11 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
// todo assign the tag value and the corresponding row data
int32_t
currentRow
=
pBlock
->
info
.
rows
;
if
(
pEntryInfo
->
numOfRes
<=
0
)
{
colDataAppendNULL
(
pCol
,
currentRow
);
setSelectivityValue
(
pCtx
,
pBlock
,
&
pRes
->
nullTuplePos
,
currentRow
);
return
pEntryInfo
->
numOfRes
;
}
for
(
int32_t
i
=
0
;
i
<
pEntryInfo
->
numOfRes
;
++
i
)
{
STopBotResItem
*
pItem
=
&
pRes
->
pItems
[
i
];
if
(
type
==
TSDB_DATA_TYPE_FLOAT
)
{
...
...
@@ -4897,7 +4931,8 @@ bool sampleFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo)
pInfo
->
numSampled
=
0
;
pInfo
->
colType
=
pCtx
->
resDataInfo
.
type
;
pInfo
->
colBytes
=
pCtx
->
resDataInfo
.
bytes
;
pInfo
->
nullTuplePos
.
pageId
=
-
1
;
pInfo
->
nullTupleSaved
=
false
;
pInfo
->
data
=
(
char
*
)
pInfo
+
sizeof
(
SSampleInfo
);
pInfo
->
tuplePos
=
(
STuplePos
*
)((
char
*
)
pInfo
+
sizeof
(
SSampleInfo
)
+
pInfo
->
samples
*
pInfo
->
colBytes
);
...
...
@@ -4943,6 +4978,11 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) {
doReservoirSample
(
pCtx
,
pInfo
,
data
,
i
);
}
if
(
pInfo
->
numSampled
==
0
&&
pCtx
->
subsidiaries
.
num
>
0
&&
!
pInfo
->
nullTupleSaved
)
{
doSaveTupleData
(
pCtx
,
pInput
->
startRowIndex
,
pCtx
->
pSrcBlock
,
&
pInfo
->
nullTuplePos
);
pInfo
->
nullTupleSaved
=
true
;
}
SET_VAL
(
pResInfo
,
pInfo
->
numSampled
,
pInfo
->
numSampled
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -4957,6 +4997,11 @@ int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SColumnInfoData
*
pCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
slotId
);
int32_t
currentRow
=
pBlock
->
info
.
rows
;
if
(
pInfo
->
numSampled
==
0
)
{
colDataAppendNULL
(
pCol
,
currentRow
);
setSelectivityValue
(
pCtx
,
pBlock
,
&
pInfo
->
nullTuplePos
,
currentRow
);
return
pInfo
->
numSampled
;
}
for
(
int32_t
i
=
0
;
i
<
pInfo
->
numSampled
;
++
i
)
{
colDataAppend
(
pCol
,
currentRow
+
i
,
pInfo
->
data
+
i
*
pInfo
->
colBytes
,
false
);
setSelectivityValue
(
pCtx
,
pBlock
,
&
pInfo
->
tuplePos
[
i
],
currentRow
+
i
);
...
...
source/libs/function/src/functionMgt.c
浏览文件 @
530c3a55
...
...
@@ -221,6 +221,18 @@ bool fmIsLastRowFunc(int32_t funcId) {
return
FUNCTION_TYPE_LAST_ROW
==
funcMgtBuiltins
[
funcId
].
type
;
}
bool
fmIsReturnNotNullFunc
(
int32_t
funcId
)
{
if
(
funcId
<
0
||
funcId
>=
funcMgtBuiltinsNum
)
{
return
false
;
}
return
FUNCTION_TYPE_LAST
==
funcMgtBuiltins
[
funcId
].
type
||
FUNCTION_TYPE_LAST_PARTIAL
==
funcMgtBuiltins
[
funcId
].
type
||
FUNCTION_TYPE_LAST_MERGE
==
funcMgtBuiltins
[
funcId
].
type
||
FUNCTION_TYPE_FIRST
==
funcMgtBuiltins
[
funcId
].
type
||
FUNCTION_TYPE_FIRST_PARTIAL
==
funcMgtBuiltins
[
funcId
].
type
||
FUNCTION_TYPE_FIRST_MERGE
==
funcMgtBuiltins
[
funcId
].
type
;
}
bool
fmIsSelectValueFunc
(
int32_t
funcId
)
{
if
(
funcId
<
0
||
funcId
>=
funcMgtBuiltinsNum
)
{
return
false
;
...
...
tests/docs-examples-test/go.sh
0 → 100644
浏览文件 @
530c3a55
#!/bin/bash
set
-e
taosd
>>
/dev/null 2>&1 &
taosadapter
>>
/dev/null 2>&1 &
cd
../../docs/examples/go
go mod tidy
go run ./connect/afconn/main.go
go run ./connect/cgoexample/main.go
go run ./connect/restexample/main.go
taos
-s
"drop database if exists test"
go run ./insert/json/main.go
taos
-s
"drop database if exists test"
go run ./insert/line/main.go
taos
-s
"drop database if exists power"
go run ./insert/sql/main.go
taos
-s
"drop database if exists power"
go run ./insert/stmt/main.go
taos
-s
"drop database if exists test"
go run ./insert/telnet/main.go
go run ./query/sync/main.go
taos
-s
"drop topic if exists example_tmq_topic"
taos
-s
"drop database if exists example_tmq"
go run ./sub/main.go
tests/script/tsim/parser/function.sim
浏览文件 @
530c3a55
...
...
@@ -327,8 +327,8 @@ endi
print =================>td-2610
sql select stddev(k) from tm2 where ts='2020-12-29 18:46:19.109'
if $rows !=
0
then
print expect
0
, actual:$rows
if $rows !=
1
then
print expect
1
, actual:$rows
return -1
endi
sql select twa(k) from tm2 where ts='2020-12-29 18:46:19.109'
...
...
@@ -406,7 +406,7 @@ if $data00 != 1.378704626 then
endi
sql select stddev(c) from m1
if $rows !=
0
then
if $rows !=
1
then
return -1
endi
...
...
tests/system-test/2-query/avg.py
浏览文件 @
530c3a55
...
...
@@ -295,7 +295,7 @@ class TDTestCase:
tdSql
.
checkData
(
0
,
0
,
4.500000000
)
tdSql
.
query
(
f
" select avg(c1) from
{
dbname
}
.stb1 where c1 is null "
)
tdSql
.
checkRows
(
0
)
tdSql
.
checkRows
(
1
)
def
avg_func_filter
(
self
,
dbname
=
"db"
):
...
...
tests/system-test/2-query/distribute_agg_apercentile.py
浏览文件 @
530c3a55
...
...
@@ -86,7 +86,7 @@ class TDTestCase:
def
distribute_agg_query
(
self
,
dbname
=
"testdb"
):
# basic filter
tdSql
.
query
(
f
"select apercentile(c1 , 20) from
{
dbname
}
.stb1 where c1 is null"
)
tdSql
.
checkRows
(
0
)
tdSql
.
checkRows
(
1
)
tdSql
.
query
(
f
"select apercentile(c1 , 20) from
{
dbname
}
.stb1 where t1=1"
)
tdSql
.
checkData
(
0
,
0
,
2.800000000
)
...
...
tests/system-test/2-query/distribute_agg_max.py
浏览文件 @
530c3a55
...
...
@@ -168,7 +168,7 @@ class TDTestCase:
def
distribute_agg_query
(
self
,
dbname
=
"testdb"
):
# basic filter
tdSql
.
query
(
f
"select max(c1) from
{
dbname
}
.stb1 where c1 is null"
)
tdSql
.
checkRows
(
0
)
tdSql
.
checkRows
(
1
)
tdSql
.
query
(
f
"select max(c1) from
{
dbname
}
.stb1 where t1=1"
)
tdSql
.
checkData
(
0
,
0
,
10
)
...
...
tests/system-test/2-query/distribute_agg_min.py
浏览文件 @
530c3a55
...
...
@@ -167,7 +167,7 @@ class TDTestCase:
def
distribute_agg_query
(
self
,
dbname
=
"testdb"
):
# basic filter
tdSql
.
query
(
f
"select min(c1) from
{
dbname
}
.stb1 where c1 is null"
)
tdSql
.
checkRows
(
0
)
tdSql
.
checkRows
(
1
)
tdSql
.
query
(
f
"select min(c1) from
{
dbname
}
.stb1 where t1=1"
)
tdSql
.
checkData
(
0
,
0
,
2
)
...
...
tests/system-test/2-query/distribute_agg_spread.py
浏览文件 @
530c3a55
...
...
@@ -195,7 +195,7 @@ class TDTestCase:
def
distribute_agg_query
(
self
):
# basic filter
tdSql
.
query
(
"select spread(c1) from stb1 where c1 is null"
)
tdSql
.
checkRows
(
0
)
tdSql
.
checkRows
(
1
)
tdSql
.
query
(
"select spread(c1) from stb1 where t1=1"
)
tdSql
.
checkData
(
0
,
0
,
8.000000000
)
...
...
tests/system-test/2-query/sample.py
浏览文件 @
530c3a55
...
...
@@ -743,7 +743,7 @@ class TDTestCase:
# filter data
tdSql
.
query
(
" select sample(c1, 20 ) from t1 where c1 is null "
)
tdSql
.
checkRows
(
0
)
tdSql
.
checkRows
(
1
)
tdSql
.
query
(
" select sample(c1, 20 ) from t1 where c1 =6 "
)
tdSql
.
checkRows
(
1
)
...
...
@@ -891,4 +891,4 @@ class TDTestCase:
tdLog
.
success
(
"%s successfully executed"
%
__file__
)
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
\ No newline at end of file
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录