diff --git a/cmake/taosadapter_CMakeLists.txt.in b/cmake/taosadapter_CMakeLists.txt.in
index ab1609f35f15cd625cb2346ffe80ffad7330658e..13b247770ea7eef6b64209ca98787ff6d733bf85 100644
--- a/cmake/taosadapter_CMakeLists.txt.in
+++ b/cmake/taosadapter_CMakeLists.txt.in
@@ -2,7 +2,7 @@
# taosadapter
ExternalProject_Add(taosadapter
GIT_REPOSITORY https://github.com/taosdata/taosadapter.git
- GIT_TAG 69eee2e
+ GIT_TAG 213f8b3
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
diff --git a/cmake/taostools_CMakeLists.txt.in b/cmake/taostools_CMakeLists.txt.in
index bc3fe878847685022d3490278f0d2880fa3ef3e0..cddc5aee4ea1d33c98833d134a507587e7dd6bdf 100644
--- a/cmake/taostools_CMakeLists.txt.in
+++ b/cmake/taostools_CMakeLists.txt.in
@@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
- GIT_TAG f80dd7e
+ GIT_TAG 723f696
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
diff --git a/docs/en/07-develop/07-tmq.mdx b/docs/en/07-develop/07-tmq.mdx
index 17b3f5caa062eaacb4216b7153e899040e702cc1..94a9dbffbd2bc5199edf6f2a6e4561355d967705 100644
--- a/docs/en/07-develop/07-tmq.mdx
+++ b/docs/en/07-develop/07-tmq.mdx
@@ -117,19 +117,22 @@ class TaosConsumer():
```go
-func NewConsumer(conf *Config) (*Consumer, error)
+func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)
-func (c *Consumer) Close() error
-
-func (c *Consumer) Commit(ctx context.Context, message unsafe.Pointer) error
+// rebalanceCb is reserved for compatibility purpose
+func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error
-func (c *Consumer) FreeMessage(message unsafe.Pointer)
+// rebalanceCb is reserved for compatibility purpose
+func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error
-func (c *Consumer) Poll(timeout time.Duration) (*Result, error)
+func (c *Consumer) Poll(timeoutMs int) tmq.Event
-func (c *Consumer) Subscribe(topics []string) error
+// tmq.TopicPartition is reserved for compatibility purpose
+func (c *Consumer) Commit() ([]tmq.TopicPartition, error)
func (c *Consumer) Unsubscribe() error
+
+func (c *Consumer) Close() error
```
@@ -357,50 +360,20 @@ public class MetersDeserializer extends ReferenceDeserializer {
```go
-config := tmq.NewConfig()
-defer config.Destroy()
-err = config.SetGroupID("test")
-if err != nil {
- panic(err)
-}
-err = config.SetAutoOffsetReset("earliest")
-if err != nil {
- panic(err)
-}
-err = config.SetConnectIP("127.0.0.1")
-if err != nil {
- panic(err)
-}
-err = config.SetConnectUser("root")
-if err != nil {
- panic(err)
-}
-err = config.SetConnectPass("taosdata")
-if err != nil {
- panic(err)
-}
-err = config.SetConnectPort("6030")
-if err != nil {
- panic(err)
-}
-err = config.SetMsgWithTableName(true)
-if err != nil {
- panic(err)
-}
-err = config.EnableHeartBeat()
-if err != nil {
- panic(err)
-}
-err = config.EnableAutoCommit(func(result *wrapper.TMQCommitCallbackResult) {
- if result.ErrCode != 0 {
- errStr := wrapper.TMQErr2Str(result.ErrCode)
- err := errors.NewError(int(result.ErrCode), errStr)
- panic(err)
- }
-})
-if err != nil {
- panic(err)
+conf := &tmq.ConfigMap{
+ "group.id": "test",
+ "auto.offset.reset": "earliest",
+ "td.connect.ip": "127.0.0.1",
+ "td.connect.user": "root",
+ "td.connect.pass": "taosdata",
+ "td.connect.port": "6030",
+ "client.id": "test_tmq_c",
+ "enable.auto.commit": "false",
+ "enable.heartbeat.background": "true",
+ "experimental.snapshot.enable": "true",
+ "msg.with.table.name": "true",
}
+consumer, err := NewConsumer(conf)
```
@@ -523,11 +496,7 @@ consumer.subscribe(topics);
```go
-consumer, err := tmq.NewConsumer(config)
-if err != nil {
- panic(err)
-}
-err = consumer.Subscribe([]string{"example_tmq_topic"})
+err = consumer.Subscribe("example_tmq_topic", nil)
if err != nil {
panic(err)
}
@@ -611,13 +580,17 @@ while(running){
```go
for {
- result, err := consumer.Poll(time.Second)
- if err != nil {
- panic(err)
+ ev := consumer.Poll(0)
+ if ev != nil {
+ switch e := ev.(type) {
+ case *tmqcommon.DataMessage:
+ fmt.Println(e.Value())
+ case tmqcommon.Error:
+ fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
+ panic(e)
+ }
+ consumer.Commit()
}
- fmt.Println(result)
- consumer.Commit(context.Background(), result.Message)
- consumer.FreeMessage(result.Message)
}
```
@@ -729,7 +702,11 @@ consumer.close();
```go
-consumer.Close()
+/* Unsubscribe */
+_ = consumer.Unsubscribe()
+
+/* Close consumer */
+_ = consumer.Close()
```
diff --git a/docs/en/14-reference/03-connector/05-go.mdx b/docs/en/14-reference/03-connector/05-go.mdx
index df5b129cea552144d5833190d46e8a78f2fd2fa5..60407c0735bf9bcb42ae54bddcc9afa639a02fcc 100644
--- a/docs/en/14-reference/03-connector/05-go.mdx
+++ b/docs/en/14-reference/03-connector/05-go.mdx
@@ -355,26 +355,29 @@ The `af` package encapsulates TDengine advanced functions such as connection man
#### Subscribe
-* `func NewConsumer(conf *Config) (*Consumer, error)`
+* `func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)`
Creates consumer group.
-* `func (c *Consumer) Subscribe(topics []string) error`
+* `func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error`
+Note: `rebalanceCb` is reserved for compatibility purpose
+
+Subscribes a topic.
+
+* `func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error`
+Note: `rebalanceCb` is reserved for compatibility purpose
Subscribes to topics.
-* `func (c *Consumer) Poll(timeout time.Duration) (*Result, error)`
+* `func (c *Consumer) Poll(timeoutMs int) tmq.Event`
Polling information.
-* `func (c *Consumer) Commit(ctx context.Context, message unsafe.Pointer) error`
+* `func (c *Consumer) Commit() ([]tmq.TopicPartition, error)`
+Note: `tmq.TopicPartition` is reserved for compatibility purpose
Commit information.
-* `func (c *Consumer) FreeMessage(message unsafe.Pointer)`
-
-Free information.
-
* `func (c *Consumer) Unsubscribe() error`
Unsubscribe.
@@ -441,25 +444,36 @@ Close consumer.
### Subscribe via WebSocket
-* `func NewConsumer(config *Config) (*Consumer, error)`
+* `func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)`
- Creates consumer group.
+Creates consumer group.
+
+* `func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error`
+Note: `rebalanceCb` is reserved for compatibility purpose
-* `func (c *Consumer) Subscribe(topic []string) error`
+Subscribes a topic.
- Subscribes to topics.
+* `func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error`
+Note: `rebalanceCb` is reserved for compatibility purpose
-* `func (c *Consumer) Poll(timeout time.Duration) (*Result, error)`
+Subscribes to topics.
- Polling information.
+* `func (c *Consumer) Poll(timeoutMs int) tmq.Event`
-* `func (c *Consumer) Commit(messageID uint64) error`
+Polling information.
- Commit information.
+* `func (c *Consumer) Commit() ([]tmq.TopicPartition, error)`
+Note: `tmq.TopicPartition` is reserved for compatibility purpose
+
+Commit information.
+
+* `func (c *Consumer) Unsubscribe() error`
+
+Unsubscribe.
* `func (c *Consumer) Close() error`
- Close consumer.
+Close consumer.
For a complete example see [GitHub sample file](https://github.com/taosdata/driver-go/blob/3.0/examples/tmqoverws/main.go)
diff --git a/docs/examples/go/go.mod b/docs/examples/go/go.mod
index 2bc1a74cb6ef14221fa384701773dc73fe3b161d..6696852312f39b93df189beba89f08c1945ef9f1 100644
--- a/docs/examples/go/go.mod
+++ b/docs/examples/go/go.mod
@@ -2,5 +2,5 @@ module goexample
go 1.17
-require github.com/taosdata/driver-go/v3 3.0
+require github.com/taosdata/driver-go/v3 3.1.0
diff --git a/docs/examples/go/sub/main.go b/docs/examples/go/sub/main.go
index a13d394a2c5009c1ad88684109b6f16b4d8a0540..1f7218936fbe457615562ded1b938daca95225cb 100644
--- a/docs/examples/go/sub/main.go
+++ b/docs/examples/go/sub/main.go
@@ -1,17 +1,12 @@
package main
import (
- "context"
- "encoding/json"
"fmt"
- "strconv"
- "time"
+ "os"
"github.com/taosdata/driver-go/v3/af"
"github.com/taosdata/driver-go/v3/af/tmq"
- "github.com/taosdata/driver-go/v3/common"
- "github.com/taosdata/driver-go/v3/errors"
- "github.com/taosdata/driver-go/v3/wrapper"
+ tmqcommon "github.com/taosdata/driver-go/v3/common/tmq"
)
func main() {
@@ -28,79 +23,56 @@ func main() {
if err != nil {
panic(err)
}
- config := tmq.NewConfig()
- defer config.Destroy()
- err = config.SetGroupID("test")
if err != nil {
panic(err)
}
- err = config.SetAutoOffsetReset("earliest")
- if err != nil {
- panic(err)
- }
- err = config.SetConnectIP("127.0.0.1")
- if err != nil {
- panic(err)
- }
- err = config.SetConnectUser("root")
- if err != nil {
- panic(err)
- }
- err = config.SetConnectPass("taosdata")
+ consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
+ "group.id": "test",
+ "auto.offset.reset": "earliest",
+ "td.connect.ip": "127.0.0.1",
+ "td.connect.user": "root",
+ "td.connect.pass": "taosdata",
+ "td.connect.port": "6030",
+ "client.id": "test_tmq_client",
+ "enable.auto.commit": "false",
+ "enable.heartbeat.background": "true",
+ "experimental.snapshot.enable": "true",
+ "msg.with.table.name": "true",
+ })
if err != nil {
panic(err)
}
- err = config.SetConnectPort("6030")
+ err = consumer.Subscribe("example_tmq_topic", nil)
if err != nil {
panic(err)
}
- err = config.SetMsgWithTableName(true)
+ _, err = db.Exec("create table example_tmq.t1 (ts timestamp,v int)")
if err != nil {
panic(err)
}
- err = config.EnableHeartBeat()
+ _, err = db.Exec("insert into example_tmq.t1 values(now,1)")
if err != nil {
panic(err)
}
- err = config.EnableAutoCommit(func(result *wrapper.TMQCommitCallbackResult) {
- if result.ErrCode != 0 {
- errStr := wrapper.TMQErr2Str(result.ErrCode)
- err := errors.NewError(int(result.ErrCode), errStr)
- panic(err)
+ for i := 0; i < 5; i++ {
+ ev := consumer.Poll(0)
+ if ev != nil {
+ switch e := ev.(type) {
+ case *tmqcommon.DataMessage:
+ fmt.Println(e.String())
+ case tmqcommon.Error:
+ fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
+ panic(e)
+ }
+ consumer.Commit()
}
- })
- if err != nil {
- panic(err)
}
- consumer, err := tmq.NewConsumer(config)
+ err = consumer.Unsubscribe()
if err != nil {
panic(err)
}
- err = consumer.Subscribe([]string{"example_tmq_topic"})
+ err = consumer.Close()
if err != nil {
panic(err)
}
- _, err = db.Exec("create table example_tmq.t1 (ts timestamp,v int)")
- if err != nil {
- panic(err)
- }
- _, err = db.Exec("insert into example_tmq.t1 values(now,1)")
- if err != nil {
- panic(err)
- }
- for {
- result, err := consumer.Poll(time.Second)
- if err != nil {
- panic(err)
- }
- if result.Type != common.TMQ_RES_DATA {
- panic("want message type 1 got " + strconv.Itoa(int(result.Type)))
- }
- data, _ := json.Marshal(result.Data)
- fmt.Println(string(data))
- consumer.Commit(context.Background(), result.Message)
- consumer.FreeMessage(result.Message)
- break
- }
- consumer.Close()
}
diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx
index 1f5a089aaa2a051e238eedc0315c37cad643b33f..039e9eb6356f55d2cadd83cd4c3ce5e3082c04d1 100644
--- a/docs/zh/07-develop/07-tmq.mdx
+++ b/docs/zh/07-develop/07-tmq.mdx
@@ -115,19 +115,22 @@ class TaosConsumer():
```go
-func NewConsumer(conf *Config) (*Consumer, error)
+func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)
-func (c *Consumer) Close() error
-
-func (c *Consumer) Commit(ctx context.Context, message unsafe.Pointer) error
+// 出于兼容目的保留 rebalanceCb 参数,当前未使用
+func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error
-func (c *Consumer) FreeMessage(message unsafe.Pointer)
+// 出于兼容目的保留 rebalanceCb 参数,当前未使用
+func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error
-func (c *Consumer) Poll(timeout time.Duration) (*Result, error)
+func (c *Consumer) Poll(timeoutMs int) tmq.Event
-func (c *Consumer) Subscribe(topics []string) error
+// 出于兼容目的保留 tmq.TopicPartition 参数,当前未使用
+func (c *Consumer) Commit() ([]tmq.TopicPartition, error)
func (c *Consumer) Unsubscribe() error
+
+func (c *Consumer) Close() error
```
@@ -355,50 +358,20 @@ public class MetersDeserializer extends ReferenceDeserializer {
```go
-config := tmq.NewConfig()
-defer config.Destroy()
-err = config.SetGroupID("test")
-if err != nil {
- panic(err)
-}
-err = config.SetAutoOffsetReset("earliest")
-if err != nil {
- panic(err)
-}
-err = config.SetConnectIP("127.0.0.1")
-if err != nil {
- panic(err)
-}
-err = config.SetConnectUser("root")
-if err != nil {
- panic(err)
-}
-err = config.SetConnectPass("taosdata")
-if err != nil {
- panic(err)
-}
-err = config.SetConnectPort("6030")
-if err != nil {
- panic(err)
-}
-err = config.SetMsgWithTableName(true)
-if err != nil {
- panic(err)
-}
-err = config.EnableHeartBeat()
-if err != nil {
- panic(err)
-}
-err = config.EnableAutoCommit(func(result *wrapper.TMQCommitCallbackResult) {
- if result.ErrCode != 0 {
- errStr := wrapper.TMQErr2Str(result.ErrCode)
- err := errors.NewError(int(result.ErrCode), errStr)
- panic(err)
- }
-})
-if err != nil {
- panic(err)
+conf := &tmq.ConfigMap{
+ "group.id": "test",
+ "auto.offset.reset": "earliest",
+ "td.connect.ip": "127.0.0.1",
+ "td.connect.user": "root",
+ "td.connect.pass": "taosdata",
+ "td.connect.port": "6030",
+ "client.id": "test_tmq_c",
+ "enable.auto.commit": "false",
+ "enable.heartbeat.background": "true",
+ "experimental.snapshot.enable": "true",
+ "msg.with.table.name": "true",
}
+consumer, err := NewConsumer(conf)
```
@@ -532,11 +505,7 @@ consumer.subscribe(topics);
```go
-consumer, err := tmq.NewConsumer(config)
-if err != nil {
- panic(err)
-}
-err = consumer.Subscribe([]string{"example_tmq_topic"})
+err = consumer.Subscribe("example_tmq_topic", nil)
if err != nil {
panic(err)
}
@@ -620,13 +589,17 @@ while(running){
```go
for {
- result, err := consumer.Poll(time.Second)
- if err != nil {
- panic(err)
+ ev := consumer.Poll(0)
+ if ev != nil {
+ switch e := ev.(type) {
+ case *tmqcommon.DataMessage:
+ fmt.Println(e.Value())
+ case tmqcommon.Error:
+ fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
+ panic(e)
+ }
+ consumer.Commit()
}
- fmt.Println(result)
- consumer.Commit(context.Background(), result.Message)
- consumer.FreeMessage(result.Message)
}
```
@@ -738,7 +711,11 @@ consumer.close();
```go
-consumer.Close()
+/* Unsubscribe */
+_ = consumer.Unsubscribe()
+
+/* Close consumer */
+_ = consumer.Close()
```
diff --git a/docs/zh/08-connector/20-go.mdx b/docs/zh/08-connector/20-go.mdx
index 0fc4007f6362697222b425c8c2c803b911b9ac8a..2aa1a58e49f34b412f12bd0d67586dc6e56cf0bc 100644
--- a/docs/zh/08-connector/20-go.mdx
+++ b/docs/zh/08-connector/20-go.mdx
@@ -15,7 +15,7 @@ import GoOpenTSDBTelnet from "../07-develop/03-insert-data/_go_opts_telnet.mdx"
import GoOpenTSDBJson from "../07-develop/03-insert-data/_go_opts_json.mdx"
import GoQuery from "../07-develop/04-query-data/_go.mdx"
-`driver-go` 是 TDengine 的官方 Go 语言连接器,实现了 Go 语言[ database/sql ](https://golang.org/pkg/database/sql/) 包的接口。Go 开发人员可以通过它开发存取 TDengine 集群数据的应用软件。
+`driver-go` 是 TDengine 的官方 Go 语言连接器,实现了 Go 语言 [database/sql](https://golang.org/pkg/database/sql/) 包的接口。Go 开发人员可以通过它开发存取 TDengine 集群数据的应用软件。
`driver-go` 提供两种建立连接的方式。一种是**原生连接**,它通过 TDengine 客户端驱动程序(taosc)原生连接 TDengine 运行实例,支持数据写入、查询、订阅、schemaless 接口和参数绑定接口等功能。另外一种是 **REST 连接**,它通过 taosAdapter 提供的 REST 接口连接 TDengine 运行实例。REST 连接实现的功能特性集合和原生连接有少量不同。
@@ -112,6 +112,7 @@ REST 连接支持所有能运行 Go 的平台。
```text
username:password@protocol(address)/dbname?param=value
```
+
### 使用连接器进行连接
@@ -176,6 +177,7 @@ func main() {
}
}
```
+
@@ -207,6 +209,7 @@ func main() {
}
}
```
+
@@ -357,33 +360,32 @@ func main() {
#### 订阅
-* `func NewConsumer(conf *Config) (*Consumer, error)`
-
-创建消费者。
+* `func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)`
-* `func (c *Consumer) Subscribe(topics []string) error`
+ 创建消费者。
-订阅主题。
+* `func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error`
+注意:出于兼容目的保留 `rebalanceCb` 参数,当前未使用
-* `func (c *Consumer) Poll(timeout time.Duration) (*Result, error)`
+ 订阅单个主题。
-轮询消息。
+* `func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error`
+注意:出于兼容目的保留 `rebalanceCb` 参数,当前未使用
-* `func (c *Consumer) Commit(ctx context.Context, message unsafe.Pointer) error`
+ 订阅主题。
-提交消息。
+* `func (c *Consumer) Poll(timeoutMs int) tmq.Event`
-* `func (c *Consumer) FreeMessage(message unsafe.Pointer)`
+ 轮询消息。
-释放消息。
+* `func (c *Consumer) Commit() ([]tmq.TopicPartition, error)`
+注意:出于兼容目的保留 `tmq.TopicPartition` 参数,当前未使用
-* `func (c *Consumer) Unsubscribe() error`
-
-取消订阅。
+ 提交消息。
* `func (c *Consumer) Close() error`
-关闭消费者。
+ 关闭连接。
#### schemaless
@@ -443,25 +445,32 @@ func main() {
### 通过 WebSocket 订阅
-* `func NewConsumer(config *Config) (*Consumer, error)`
+* `func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)`
+
+ 创建消费者。
+
+* `func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error`
+注意:出于兼容目的保留 `rebalanceCb` 参数,当前未使用
- 创建消费者。
+ 订阅单个主题。
-* `func (c *Consumer) Subscribe(topic []string) error`
+* `func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error`
+注意:出于兼容目的保留 `rebalanceCb` 参数,当前未使用
- 订阅主题。
+ 订阅主题。
-* `func (c *Consumer) Poll(timeout time.Duration) (*Result, error)`
+* `func (c *Consumer) Poll(timeoutMs int) tmq.Event`
- 轮询消息。
+ 轮询消息。
-* `func (c *Consumer) Commit(messageID uint64) error`
+* `func (c *Consumer) Commit() ([]tmq.TopicPartition, error)`
+注意:出于兼容目的保留 `tmq.TopicPartition` 参数,当前未使用
- 提交消息。
+ 提交消息。
* `func (c *Consumer) Close() error`
- 关闭消费者。
+ 关闭连接。
完整订阅示例参见 [GitHub 示例文件](https://github.com/taosdata/driver-go/blob/3.0/examples/tmqoverws/main.go)
diff --git a/include/client/taos.h b/include/client/taos.h
index 838d0e826662abe5d2fbd6253601a12f06978c75..cf410a42daf1e9c401af767497a603aa12c7a536 100644
--- a/include/client/taos.h
+++ b/include/client/taos.h
@@ -208,6 +208,7 @@ DLL_EXPORT TAOS_ROW *taos_result_block(TAOS_RES *res);
DLL_EXPORT const char *taos_get_server_info(TAOS *taos);
DLL_EXPORT const char *taos_get_client_info();
+DLL_EXPORT int taos_get_current_db(TAOS *taos, char *database, int len, int *required);
DLL_EXPORT const char *taos_errstr(TAOS_RES *res);
DLL_EXPORT int taos_errno(TAOS_RES *res);
diff --git a/include/common/systable.h b/include/common/systable.h
index 6f65c1e8b870d4a42427173bf3ea17ae7ade0ce1..9b5f66f64c6fb41c7479c726ab02dc96d08e8ef5 100644
--- a/include/common/systable.h
+++ b/include/common/systable.h
@@ -36,6 +36,7 @@ extern "C" {
#define TSDB_INS_TABLE_STABLES "ins_stables"
#define TSDB_INS_TABLE_TABLES "ins_tables"
#define TSDB_INS_TABLE_TAGS "ins_tags"
+#define TSDB_INS_TABLE_COLS "ins_columns"
#define TSDB_INS_TABLE_TABLE_DISTRIBUTED "ins_table_distributed"
#define TSDB_INS_TABLE_USERS "ins_users"
#define TSDB_INS_TABLE_LICENCES "ins_grants"
diff --git a/include/common/tmsg.h b/include/common/tmsg.h
index 800f9e2eb7b8e9806d1c64eb6829c1191851439f..1ee6c043ee769fba9112f165ece9ab30330c5034 100644
--- a/include/common/tmsg.h
+++ b/include/common/tmsg.h
@@ -115,6 +115,7 @@ typedef enum _mgmt_table {
TSDB_MGMT_TABLE_STREAMS,
TSDB_MGMT_TABLE_TABLE,
TSDB_MGMT_TABLE_TAG,
+ TSDB_MGMT_TABLE_COL,
TSDB_MGMT_TABLE_USER,
TSDB_MGMT_TABLE_GRANTS,
TSDB_MGMT_TABLE_VGROUP,
@@ -343,7 +344,8 @@ void tFreeSSubmitRsp(SSubmitRsp* pRsp);
#define COL_IS_SET(FLG) (((FLG) & (COL_SET_VAL | COL_SET_NULL)) != 0)
#define COL_CLR_SET(FLG) ((FLG) &= (~(COL_SET_VAL | COL_SET_NULL)))
-#define IS_BSMA_ON(s) (((s)->flags & 0x01) == COL_SMA_ON)
+#define IS_BSMA_ON(s) (((s)->flags & 0x01) == COL_SMA_ON)
+#define IS_SET_NULL(s) (((s)->flags & COL_SET_NULL) == COL_SET_NULL)
#define SSCHMEA_TYPE(s) ((s)->type)
#define SSCHMEA_FLAGS(s) ((s)->flags)
@@ -381,6 +383,13 @@ static FORCE_INLINE void tDeleteSSchemaWrapper(SSchemaWrapper* pSchemaWrapper) {
}
}
+static FORCE_INLINE void tDeleteSSchemaWrapperForHash(void* pSchemaWrapper) {
+ if (pSchemaWrapper != NULL && *(SSchemaWrapper**)pSchemaWrapper != NULL) {
+ taosMemoryFree((*(SSchemaWrapper**)pSchemaWrapper)->pSchema);
+ taosMemoryFree(*(SSchemaWrapper**)pSchemaWrapper);
+ }
+}
+
static FORCE_INLINE int32_t taosEncodeSSchema(void** buf, const SSchema* pSchema) {
int32_t tlen = 0;
tlen += taosEncodeFixedI8(buf, pSchema->type);
@@ -1392,6 +1401,7 @@ typedef struct {
char db[TSDB_DB_FNAME_LEN];
char tb[TSDB_TABLE_NAME_LEN];
char user[TSDB_USER_LEN];
+ char filterTb[TSDB_TABLE_NAME_LEN];
int64_t showId;
} SRetrieveTableReq;
@@ -1771,7 +1781,9 @@ typedef struct {
// 3.0.20
int64_t checkpointFreq; // ms
// 3.0.2.3
- int8_t createStb;
+ int8_t createStb;
+ uint64_t targetStbUid;
+ SArray* fillNullCols;
} SCMCreateStreamReq;
typedef struct {
diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h
index bbf332c4d4c5972d5007a77838c81065764d9a4f..57ddeb657cb6c583e8a97dc089b735d903f2347e 100644
--- a/include/libs/qcom/query.h
+++ b/include/libs/qcom/query.h
@@ -207,6 +207,12 @@ typedef struct SQueryNodeStat {
int32_t tableNum; // vg table number, unit is TSDB_TABLE_NUM_UNIT
} SQueryNodeStat;
+typedef struct SColLocation {
+ int16_t slotId;
+ col_id_t colId;
+ int8_t type;
+} SColLocation;
+
int32_t initTaskQueue();
int32_t cleanupTaskQueue();
diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c
index e1df2a4540b500a53e88147c3eb22affcecb301c..1179f4d23a024c5657e6cc082143a919bcb7683c 100644
--- a/source/client/src/clientMain.c
+++ b/source/client/src/clientMain.c
@@ -435,6 +435,22 @@ const char *taos_data_type(int type) {
return "TSDB_DATA_TYPE_NCHAR";
case TSDB_DATA_TYPE_JSON:
return "TSDB_DATA_TYPE_JSON";
+ case TSDB_DATA_TYPE_UTINYINT:
+ return "TSDB_DATA_TYPE_UTINYINT";
+ case TSDB_DATA_TYPE_USMALLINT:
+ return "TSDB_DATA_TYPE_USMALLINT";
+ case TSDB_DATA_TYPE_UINT:
+ return "TSDB_DATA_TYPE_UINT";
+ case TSDB_DATA_TYPE_UBIGINT:
+ return "TSDB_DATA_TYPE_UBIGINT";
+ case TSDB_DATA_TYPE_VARBINARY:
+ return "TSDB_DATA_TYPE_VARBINARY";
+ case TSDB_DATA_TYPE_DECIMAL:
+ return "TSDB_DATA_TYPE_DECIMAL";
+ case TSDB_DATA_TYPE_BLOB:
+ return "TSDB_DATA_TYPE_BLOB";
+ case TSDB_DATA_TYPE_MEDIUMBLOB:
+ return "TSDB_DATA_TYPE_MEDIUMBLOB";
default:
return "UNKNOWN";
}
@@ -675,6 +691,32 @@ const char *taos_get_server_info(TAOS *taos) {
return pTscObj->sDetailVer;
}
+int taos_get_current_db(TAOS *taos, char *database, int len, int *required) {
+ STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
+ if (pTscObj == NULL) {
+ terrno = TSDB_CODE_TSC_DISCONNECTED;
+ return -1;
+ }
+
+ int code = TSDB_CODE_SUCCESS;
+ taosThreadMutexLock(&pTscObj->mutex);
+ if(database == NULL || len <= 0){
+ if(required != NULL) *required = strlen(pTscObj->db) + 1;
+ terrno = TSDB_CODE_INVALID_PARA;
+ code = -1;
+ }else if(len < strlen(pTscObj->db) + 1){
+ tstrncpy(database, pTscObj->db, len);
+ if(required) *required = strlen(pTscObj->db) + 1;
+ terrno = TSDB_CODE_INVALID_PARA;
+ code = -1;
+ }else{
+ strcpy(database, pTscObj->db);
+ code = 0;
+ }
+ taosThreadMutexUnlock(&pTscObj->mutex);
+ return code;
+}
+
static void destoryTablesReq(void *p) {
STablesReq *pRes = (STablesReq *)p;
taosArrayDestroy(pRes->pTables);
diff --git a/source/common/src/systable.c b/source/common/src/systable.c
index 6c86743b696e23ddcdc403860ecafaeb2caa6448..8791a81bbe7fbed021c13b772892f11ca109a41d 100644
--- a/source/common/src/systable.c
+++ b/source/common/src/systable.c
@@ -178,6 +178,18 @@ static const SSysDbTableSchema userTagsSchema[] = {
{.name = "tag_value", .bytes = TSDB_MAX_TAGS_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
};
+static const SSysDbTableSchema userColsSchema[] = {
+ {.name = "table_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
+ {.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
+ {.name = "table_type", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
+ {.name = "col_name", .bytes = TSDB_COL_NAME_LEN - 1 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
+ {.name = "col_type", .bytes = 32 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
+ {.name = "col_length", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
+ {.name = "col_precision", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
+ {.name = "col_scale", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
+ {.name = "col_nullable", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}
+};
+
static const SSysDbTableSchema userTblDistSchema[] = {
{.name = "db_name", .bytes = 32 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "table_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
@@ -294,6 +306,7 @@ static const SSysTableMeta infosMeta[] = {
{TSDB_INS_TABLE_STABLES, userStbsSchema, tListLen(userStbsSchema), false},
{TSDB_INS_TABLE_TABLES, userTblsSchema, tListLen(userTblsSchema), false},
{TSDB_INS_TABLE_TAGS, userTagsSchema, tListLen(userTagsSchema), false},
+ {TSDB_INS_TABLE_COLS, userColsSchema, tListLen(userColsSchema), false},
// {TSDB_INS_TABLE_TABLE_DISTRIBUTED, userTblDistSchema, tListLen(userTblDistSchema)},
{TSDB_INS_TABLE_USERS, userUsersSchema, tListLen(userUsersSchema), false},
{TSDB_INS_TABLE_LICENCES, grantsSchema, tListLen(grantsSchema), true},
diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c
index 2c4fc987504bd389ee9634ba71cf26be72ebc656..87ad592afb3103c31a98c866bac745d861dd51c0 100644
--- a/source/common/src/tmsg.c
+++ b/source/common/src/tmsg.c
@@ -3191,6 +3191,7 @@ int32_t tSerializeSRetrieveTableReq(void *buf, int32_t bufLen, SRetrieveTableReq
if (tEncodeI64(&encoder, pReq->showId) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->tb) < 0) return -1;
+ if (tEncodeCStr(&encoder, pReq->filterTb) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->user) < 0) return -1;
tEndEncode(&encoder);
@@ -3207,6 +3208,7 @@ int32_t tDeserializeSRetrieveTableReq(void *buf, int32_t bufLen, SRetrieveTableR
if (tDecodeI64(&decoder, &pReq->showId) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->tb) < 0) return -1;
+ if (tDecodeCStrTo(&decoder, pReq->filterTb) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->user) < 0) return -1;
tEndDecode(&decoder);
@@ -5425,6 +5427,7 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
if (tEncodeCStr(&encoder, pField->name) < 0) return -1;
}
if (tEncodeI8(&encoder, pReq->createStb) < 0) return -1;
+ if (tEncodeU64(&encoder, pReq->targetStbUid) < 0) return -1;
tEndEncode(&encoder);
@@ -5486,6 +5489,7 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
}
}
if (tDecodeI8(&decoder, &pReq->createStb) < 0) return -1;
+ if (tDecodeU64(&decoder, &pReq->targetStbUid) < 0) return -1;
tEndDecode(&decoder);
diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h
index 2f824b48b4b22eb6c758d05ee30787099278a9f2..1cbd9bfb66b92b63dd2a01285b609eed671805f7 100644
--- a/source/dnode/mnode/impl/inc/mndDef.h
+++ b/source/dnode/mnode/impl/inc/mndDef.h
@@ -444,6 +444,7 @@ typedef struct {
STableMetaRsp* pMeta;
bool sysDbRsp;
char db[TSDB_DB_FNAME_LEN];
+ char filterTb[TSDB_TABLE_NAME_LEN];
} SShowObj;
typedef struct {
diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c
index 7a8de4099f0a11f313552a6f2629c797fda06f39..48d8e89bfe73e47dd89e75f8c13626ebd3d1ecf4 100644
--- a/source/dnode/mnode/impl/src/mndShow.c
+++ b/source/dnode/mnode/impl/src/mndShow.c
@@ -19,6 +19,7 @@
#include "systable.h"
#define SHOW_STEP_SIZE 100
+#define SHOW_COLS_STEP_SIZE 4096
static SShowObj *mndCreateShowObj(SMnode *pMnode, SRetrieveTableReq *pReq);
static void mndFreeShowObj(SShowObj *pShow);
@@ -76,6 +77,8 @@ static int32_t convertToRetrieveType(char *name, int32_t len) {
type = TSDB_MGMT_TABLE_TABLE;
} else if (strncasecmp(name, TSDB_INS_TABLE_TAGS, len) == 0) {
type = TSDB_MGMT_TABLE_TAG;
+ } else if (strncasecmp(name, TSDB_INS_TABLE_COLS, len) == 0) {
+ type = TSDB_MGMT_TABLE_COL;
} else if (strncasecmp(name, TSDB_INS_TABLE_TABLE_DISTRIBUTED, len) == 0) {
// type = TSDB_MGMT_TABLE_DIST;
} else if (strncasecmp(name, TSDB_INS_TABLE_USERS, len) == 0) {
@@ -131,6 +134,7 @@ static SShowObj *mndCreateShowObj(SMnode *pMnode, SRetrieveTableReq *pReq) {
showObj.pMnode = pMnode;
showObj.type = convertToRetrieveType(pReq->tb, tListLen(pReq->tb));
memcpy(showObj.db, pReq->db, TSDB_DB_FNAME_LEN);
+ strncpy(showObj.filterTb, pReq->filterTb, TSDB_TABLE_NAME_LEN);
int32_t keepTime = tsShellActivityTimer * 6 * 1000;
SShowObj *pShow = taosCachePut(pMgmt->cache, &showId, sizeof(int64_t), &showObj, size, keepTime);
@@ -190,13 +194,15 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) {
int32_t rowsToRead = SHOW_STEP_SIZE;
int32_t size = 0;
int32_t rowsRead = 0;
-
+ mDebug("mndProcessRetrieveSysTableReq start");
SRetrieveTableReq retrieveReq = {0};
if (tDeserializeSRetrieveTableReq(pReq->pCont, pReq->contLen, &retrieveReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
+ mDebug("mndProcessRetrieveSysTableReq tb:%s", retrieveReq.tb);
+
if (retrieveReq.showId == 0) {
STableMetaRsp *pMeta = taosHashGet(pMnode->infosMeta, retrieveReq.tb, strlen(retrieveReq.tb));
if (pMeta == NULL) {
@@ -226,6 +232,9 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) {
}
}
+ if(pShow->type == TSDB_MGMT_TABLE_COL){ // expend capacity for ins_columns
+ rowsToRead = SHOW_COLS_STEP_SIZE;
+ }
ShowRetrieveFp retrieveFp = pMgmt->retrieveFps[pShow->type];
if (retrieveFp == NULL) {
mndReleaseShowObj(pShow, false);
diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c
index d504a94700096162d636390ee6eb5e2d2f77f139..c243e83a15333c877434c07ed961c1ce2f5af9e2 100644
--- a/source/dnode/mnode/impl/src/mndStb.c
+++ b/source/dnode/mnode/impl/src/mndStb.c
@@ -43,6 +43,7 @@ static int32_t mndProcessAlterStbReq(SRpcMsg *pReq);
static int32_t mndProcessDropStbReq(SRpcMsg *pReq);
static int32_t mndProcessTableMetaReq(SRpcMsg *pReq);
static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
+static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextStb(SMnode *pMnode, void *pIter);
static int32_t mndProcessTableCfgReq(SRpcMsg *pReq);
static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp,
@@ -69,10 +70,14 @@ int32_t mndInitStb(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_TABLE_META, mndProcessTableMetaReq);
mndSetMsgHandle(pMnode, TDMT_MND_TTL_TIMER, mndProcessTtlTimer);
mndSetMsgHandle(pMnode, TDMT_MND_TABLE_CFG, mndProcessTableCfgReq);
+// mndSetMsgHandle(pMnode, TDMT_MND_SYSTABLE_RETRIEVE, mndProcessRetrieveStbReq);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STB, mndRetrieveStb);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STB, mndCancelGetNextStb);
+ mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_COL, mndRetrieveStbCol);
+ mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_COL, mndCancelGetNextStb);
+
return sdbSetTable(pMnode->pSdb, table);
}
@@ -2489,6 +2494,283 @@ void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t
}
}
+//static int32_t mndProcessRetrieveStbReq(SRpcMsg *pReq) {
+// SMnode *pMnode = pReq->info.node;
+// SShowMgmt *pMgmt = &pMnode->showMgmt;
+// SShowObj *pShow = NULL;
+// int32_t rowsToRead = SHOW_STEP_SIZE;
+// int32_t rowsRead = 0;
+//
+// SRetrieveTableReq retrieveReq = {0};
+// if (tDeserializeSRetrieveTableReq(pReq->pCont, pReq->contLen, &retrieveReq) != 0) {
+// terrno = TSDB_CODE_INVALID_MSG;
+// return -1;
+// }
+//
+// SMnode *pMnode = pReq->info.node;
+// SSdb *pSdb = pMnode->pSdb;
+// int32_t numOfRows = 0;
+// SDbObj *pDb = NULL;
+// ESdbStatus objStatus = 0;
+//
+// SUserObj *pUser = mndAcquireUser(pMnode, pReq->info.conn.user);
+// if (pUser == NULL) return 0;
+// bool sysinfo = pUser->sysInfo;
+//
+// // Append the information_schema database into the result.
+//// if (!pShow->sysDbRsp) {
+//// SDbObj infoschemaDb = {0};
+//// setInformationSchemaDbCfg(pMnode, &infoschemaDb);
+//// size_t numOfTables = 0;
+//// getVisibleInfosTablesNum(sysinfo, &numOfTables);
+//// mndDumpDbInfoData(pMnode, pBlock, &infoschemaDb, pShow, numOfRows, numOfTables, true, 0, 1);
+////
+//// numOfRows += 1;
+////
+//// SDbObj perfschemaDb = {0};
+//// setPerfSchemaDbCfg(pMnode, &perfschemaDb);
+//// numOfTables = 0;
+//// getPerfDbMeta(NULL, &numOfTables);
+//// mndDumpDbInfoData(pMnode, pBlock, &perfschemaDb, pShow, numOfRows, numOfTables, true, 0, 1);
+////
+//// numOfRows += 1;
+//// pShow->sysDbRsp = true;
+//// }
+//
+// SSDataBlock* p = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_COLS);
+// blockDataEnsureCapacity(p, rowsToRead);
+//
+// size_t size = 0;
+// const SSysTableMeta* pSysDbTableMeta = NULL;
+//
+// getInfosDbMeta(&pSysDbTableMeta, &size);
+// p->info.rows = buildDbColsInfoBlock(sysinfo, p, pSysDbTableMeta, size, TSDB_INFORMATION_SCHEMA_DB);
+//
+// getPerfDbMeta(&pSysDbTableMeta, &size);
+// p->info.rows = buildDbColsInfoBlock(sysinfo, p, pSysDbTableMeta, size, TSDB_PERFORMANCE_SCHEMA_DB);
+//
+// blockDataDestroy(p);
+//
+//
+// while (numOfRows < rowsToRead) {
+// pShow->pIter = sdbFetchAll(pSdb, SDB_DB, pShow->pIter, (void **)&pDb, &objStatus, true);
+// if (pShow->pIter == NULL) break;
+// if (strncmp(retrieveReq.db, pDb->name, strlen(retrieveReq.db)) != 0){
+// continue;
+// }
+// if (mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_READ_OR_WRITE_DB, pDb) != 0) {
+// continue;
+// }
+//
+// while (numOfRows < rowsToRead) {
+// pShow->pIter = sdbFetch(pSdb, SDB_STB, pShow->pIter, (void **)&pStb);
+// if (pShow->pIter == NULL) break;
+//
+// if (pDb != NULL && pStb->dbUid != pDb->uid) {
+// sdbRelease(pSdb, pStb);
+// continue;
+// }
+//
+// cols = 0;
+//
+// SName name = {0};
+// char stbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
+// mndExtractTbNameFromStbFullName(pStb->name, &stbName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN);
+// varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE]));
+//
+// SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
+// colDataAppend(pColInfo, numOfRows, (const char *)stbName, false);
+//
+// char db[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
+// tNameFromString(&name, pStb->db, T_NAME_ACCT | T_NAME_DB);
+// tNameGetDbName(&name, varDataVal(db));
+// varDataSetLen(db, strlen(varDataVal(db)));
+//
+// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
+// colDataAppend(pColInfo, numOfRows, (const char *)db, false);
+//
+// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
+// colDataAppend(pColInfo, numOfRows, (const char *)&pStb->createdTime, false);
+//
+// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
+// colDataAppend(pColInfo, numOfRows, (const char *)&pStb->numOfColumns, false);
+//
+// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
+// colDataAppend(pColInfo, numOfRows, (const char *)&pStb->numOfTags, false);
+//
+// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
+// colDataAppend(pColInfo, numOfRows, (const char *)&pStb->updateTime, false); // number of tables
+//
+// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
+// if (pStb->commentLen > 0) {
+// char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
+// STR_TO_VARSTR(comment, pStb->comment);
+// colDataAppend(pColInfo, numOfRows, comment, false);
+// } else if (pStb->commentLen == 0) {
+// char comment[VARSTR_HEADER_SIZE + VARSTR_HEADER_SIZE] = {0};
+// STR_TO_VARSTR(comment, "");
+// colDataAppend(pColInfo, numOfRows, comment, false);
+// } else {
+// colDataAppendNULL(pColInfo, numOfRows);
+// }
+//
+// char watermark[64 + VARSTR_HEADER_SIZE] = {0};
+// sprintf(varDataVal(watermark), "%" PRId64 "a,%" PRId64 "a", pStb->watermark[0], pStb->watermark[1]);
+// varDataSetLen(watermark, strlen(varDataVal(watermark)));
+//
+// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
+// colDataAppend(pColInfo, numOfRows, (const char *)watermark, false);
+//
+// char maxDelay[64 + VARSTR_HEADER_SIZE] = {0};
+// sprintf(varDataVal(maxDelay), "%" PRId64 "a,%" PRId64 "a", pStb->maxdelay[0], pStb->maxdelay[1]);
+// varDataSetLen(maxDelay, strlen(varDataVal(maxDelay)));
+//
+// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
+// colDataAppend(pColInfo, numOfRows, (const char *)maxDelay, false);
+//
+// char rollup[160 + VARSTR_HEADER_SIZE] = {0};
+// int32_t rollupNum = (int32_t)taosArrayGetSize(pStb->pFuncs);
+// char *sep = ", ";
+// int32_t sepLen = strlen(sep);
+// int32_t rollupLen = sizeof(rollup) - VARSTR_HEADER_SIZE - 2;
+// for (int32_t i = 0; i < rollupNum; ++i) {
+// char *funcName = taosArrayGet(pStb->pFuncs, i);
+// if (i) {
+// strncat(varDataVal(rollup), sep, rollupLen);
+// rollupLen -= sepLen;
+// }
+// strncat(varDataVal(rollup), funcName, rollupLen);
+// rollupLen -= strlen(funcName);
+// }
+// varDataSetLen(rollup, strlen(varDataVal(rollup)));
+//
+// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
+// colDataAppend(pColInfo, numOfRows, (const char *)rollup, false);
+//
+// numOfRows++;
+// sdbRelease(pSdb, pStb);
+// }
+//
+// if (pDb != NULL) {
+// mndReleaseDb(pMnode, pDb);
+// }
+//
+// sdbRelease(pSdb, pDb);
+// }
+//
+// pShow->numOfRows += numOfRows;
+// mndReleaseUser(pMnode, pUser);
+//
+//
+//
+//
+//
+//
+//
+//
+// ShowRetrieveFp retrieveFp = pMgmt->retrieveFps[pShow->type];
+// if (retrieveFp == NULL) {
+// mndReleaseShowObj(pShow, false);
+// terrno = TSDB_CODE_MSG_NOT_PROCESSED;
+// mError("show:0x%" PRIx64 ", failed to retrieve data since %s", pShow->id, terrstr());
+// return -1;
+// }
+//
+// mDebug("show:0x%" PRIx64 ", start retrieve data, type:%d", pShow->id, pShow->type);
+// if (retrieveReq.user[0] != 0) {
+// memcpy(pReq->info.conn.user, retrieveReq.user, TSDB_USER_LEN);
+// } else {
+// memcpy(pReq->info.conn.user, TSDB_DEFAULT_USER, strlen(TSDB_DEFAULT_USER) + 1);
+// }
+// if (retrieveReq.db[0] && mndCheckShowPrivilege(pMnode, pReq->info.conn.user, pShow->type, retrieveReq.db) != 0) {
+// return -1;
+// }
+//
+// int32_t numOfCols = pShow->pMeta->numOfColumns;
+//
+// SSDataBlock *pBlock = createDataBlock();
+// for (int32_t i = 0; i < numOfCols; ++i) {
+// SColumnInfoData idata = {0};
+//
+// SSchema *p = &pShow->pMeta->pSchemas[i];
+//
+// idata.info.bytes = p->bytes;
+// idata.info.type = p->type;
+// idata.info.colId = p->colId;
+// blockDataAppendColInfo(pBlock, &idata);
+// }
+//
+// blockDataEnsureCapacity(pBlock, rowsToRead);
+//
+// if (mndCheckRetrieveFinished(pShow)) {
+// mDebug("show:0x%" PRIx64 ", read finished, numOfRows:%d", pShow->id, pShow->numOfRows);
+// rowsRead = 0;
+// } else {
+// rowsRead = (*retrieveFp)(pReq, pShow, pBlock, rowsToRead);
+// if (rowsRead < 0) {
+// terrno = rowsRead;
+// mDebug("show:0x%" PRIx64 ", retrieve completed", pShow->id);
+// mndReleaseShowObj(pShow, true);
+// blockDataDestroy(pBlock);
+// return -1;
+// }
+//
+// pBlock->info.rows = rowsRead;
+// mDebug("show:0x%" PRIx64 ", stop retrieve data, rowsRead:%d numOfRows:%d", pShow->id, rowsRead, pShow->numOfRows);
+// }
+//
+// size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * pShow->pMeta->numOfColumns +
+// blockDataGetSize(pBlock) + blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock));
+//
+// SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size);
+// if (pRsp == NULL) {
+// mndReleaseShowObj(pShow, false);
+// terrno = TSDB_CODE_OUT_OF_MEMORY;
+// mError("show:0x%" PRIx64 ", failed to retrieve data since %s", pShow->id, terrstr());
+// blockDataDestroy(pBlock);
+// return -1;
+// }
+//
+// pRsp->handle = htobe64(pShow->id);
+//
+// if (rowsRead > 0) {
+// char *pStart = pRsp->data;
+// SSchema *ps = pShow->pMeta->pSchemas;
+//
+// *(int32_t *)pStart = htonl(pShow->pMeta->numOfColumns);
+// pStart += sizeof(int32_t); // number of columns
+//
+// for (int32_t i = 0; i < pShow->pMeta->numOfColumns; ++i) {
+// SSysTableSchema *pSchema = (SSysTableSchema *)pStart;
+// pSchema->bytes = htonl(ps[i].bytes);
+// pSchema->colId = htons(ps[i].colId);
+// pSchema->type = ps[i].type;
+//
+// pStart += sizeof(SSysTableSchema);
+// }
+//
+// int32_t len = blockEncode(pBlock, pStart, pShow->pMeta->numOfColumns);
+// }
+//
+// pRsp->numOfRows = htonl(rowsRead);
+// pRsp->precision = TSDB_TIME_PRECISION_MILLI; // millisecond time precision
+// pReq->info.rsp = pRsp;
+// pReq->info.rspLen = size;
+//
+// if (rowsRead == 0 || rowsRead < rowsToRead) {
+// pRsp->completed = 1;
+// mDebug("show:0x%" PRIx64 ", retrieve completed", pShow->id);
+// mndReleaseShowObj(pShow, true);
+// } else {
+// mDebug("show:0x%" PRIx64 ", retrieve not completed yet", pShow->id);
+// mndReleaseShowObj(pShow, false);
+// }
+//
+// blockDataDestroy(pBlock);
+// return TSDB_CODE_SUCCESS;
+//}
+
+
static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
@@ -2599,6 +2881,187 @@ static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
return numOfRows;
}
+static int32_t buildDbColsInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size,
+ const char* dbName, const char* tbName) {
+ char tName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
+ char dName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
+ char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
+ int32_t numOfRows = p->info.rows;
+
+ STR_TO_VARSTR(dName, dbName);
+ STR_TO_VARSTR(typeName, "SYSTEM_TABLE");
+
+ for (int32_t i = 0; i < size; ++i) {
+ const SSysTableMeta* pm = &pSysDbTableMeta[i];
+// if (pm->sysInfo) {
+// continue;
+// }
+ if(tbName[0] && strncmp(tbName, pm->name, TSDB_TABLE_NAME_LEN) != 0){
+ continue;
+ }
+
+ STR_TO_VARSTR(tName, pm->name);
+
+ for(int32_t j = 0; j < pm->colNum; j++){
+ // table name
+ SColumnInfoData* pColInfoData = taosArrayGet(p->pDataBlock, 0);
+ colDataAppend(pColInfoData, numOfRows, tName, false);
+
+ // database name
+ pColInfoData = taosArrayGet(p->pDataBlock, 1);
+ colDataAppend(pColInfoData, numOfRows, dName, false);
+
+ pColInfoData = taosArrayGet(p->pDataBlock, 2);
+ colDataAppend(pColInfoData, numOfRows, typeName, false);
+
+ // col name
+ char colName[TSDB_COL_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
+ STR_TO_VARSTR(colName, pm->schema[j].name);
+ pColInfoData = taosArrayGet(p->pDataBlock, 3);
+ colDataAppend(pColInfoData, numOfRows, colName, false);
+
+ // col type
+ int8_t colType = pm->schema[j].type;
+ pColInfoData = taosArrayGet(p->pDataBlock, 4);
+ char colTypeStr[VARSTR_HEADER_SIZE + 32];
+ int colTypeLen = sprintf(varDataVal(colTypeStr), "%s", tDataTypes[colType].name);
+ if (colType == TSDB_DATA_TYPE_VARCHAR) {
+ colTypeLen += sprintf(varDataVal(colTypeStr) + colTypeLen, "(%d)",
+ (int32_t)(pm->schema[j].bytes - VARSTR_HEADER_SIZE));
+ } else if (colType == TSDB_DATA_TYPE_NCHAR) {
+ colTypeLen += sprintf(
+ varDataVal(colTypeStr) + colTypeLen, "(%d)",
+ (int32_t)((pm->schema[j].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
+ }
+ varDataSetLen(colTypeStr, colTypeLen);
+ colDataAppend(pColInfoData, numOfRows, (char*)colTypeStr, false);
+
+ pColInfoData = taosArrayGet(p->pDataBlock, 5);
+ colDataAppend(pColInfoData, numOfRows, (const char*)&pm->schema[j].bytes, false);
+ for (int32_t k = 6; k <= 8; ++k) {
+ pColInfoData = taosArrayGet(p->pDataBlock, k);
+ colDataAppendNULL(pColInfoData, numOfRows);
+ }
+
+ numOfRows += 1;
+ }
+ }
+
+ return numOfRows;
+}
+
+static int32_t buildSysDbColsInfo(SSDataBlock* p, char* db, char* tb) {
+ size_t size = 0;
+ const SSysTableMeta* pSysDbTableMeta = NULL;
+
+ if(db[0] && strncmp(db, TSDB_INFORMATION_SCHEMA_DB, TSDB_DB_FNAME_LEN) != 0 && strncmp(db, TSDB_PERFORMANCE_SCHEMA_DB, TSDB_DB_FNAME_LEN) != 0){
+ return p->info.rows;
+ }
+
+ getInfosDbMeta(&pSysDbTableMeta, &size);
+ p->info.rows = buildDbColsInfoBlock(p, pSysDbTableMeta, size, TSDB_INFORMATION_SCHEMA_DB, tb);
+
+ getPerfDbMeta(&pSysDbTableMeta, &size);
+ p->info.rows = buildDbColsInfoBlock(p, pSysDbTableMeta, size, TSDB_PERFORMANCE_SCHEMA_DB, tb);
+
+ return p->info.rows;
+}
+
+static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
+ SMnode *pMnode = pReq->info.node;
+ SSdb *pSdb = pMnode->pSdb;
+ SStbObj *pStb = NULL;
+
+ int32_t numOfRows = buildSysDbColsInfo(pBlock, pShow->db, pShow->filterTb);
+ mDebug("mndRetrieveStbCol get system table cols, rows:%d, db:%s", numOfRows, pShow->db);
+ SDbObj *pDb = NULL;
+ if (strlen(pShow->db) > 0) {
+ pDb = mndAcquireDb(pMnode, pShow->db);
+ if (pDb == NULL) return terrno;
+ }
+
+ char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
+ STR_TO_VARSTR(typeName, "SUPER_TABLE");
+ while (numOfRows < rows) {
+ pShow->pIter = sdbFetch(pSdb, SDB_STB, pShow->pIter, (void **)&pStb);
+ if (pShow->pIter == NULL) break;
+
+ if (pDb != NULL && pStb->dbUid != pDb->uid) {
+ sdbRelease(pSdb, pStb);
+ continue;
+ }
+
+ SName name = {0};
+ char stbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
+ mndExtractTbNameFromStbFullName(pStb->name, &stbName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN);
+ if(pShow->filterTb[0] && strncmp(pShow->filterTb, &stbName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN) != 0){
+ sdbRelease(pSdb, pStb);
+ continue;
+ }
+ varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE]));
+
+ mDebug("mndRetrieveStbCol get stable cols, stable name:%s, db:%s", pStb->name, pStb->db);
+
+ char db[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
+ tNameFromString(&name, pStb->db, T_NAME_ACCT | T_NAME_DB);
+ tNameGetDbName(&name, varDataVal(db));
+ varDataSetLen(db, strlen(varDataVal(db)));
+
+ for(int i = 0; i < pStb->numOfColumns; i++){
+ int32_t cols = 0;
+ SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
+ colDataAppend(pColInfo, numOfRows, (const char *)stbName, false);
+
+ pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
+ colDataAppend(pColInfo, numOfRows, (const char *)db, false);
+
+ pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
+ colDataAppend(pColInfo, numOfRows, typeName, false);
+
+ // col name
+ char colName[TSDB_COL_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
+ STR_TO_VARSTR(colName, pStb->pColumns[i].name);
+ pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
+ colDataAppend(pColInfo, numOfRows, colName, false);
+
+ // col type
+ int8_t colType = pStb->pColumns[i].type;
+ pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
+ char colTypeStr[VARSTR_HEADER_SIZE + 32];
+ int colTypeLen = sprintf(varDataVal(colTypeStr), "%s", tDataTypes[colType].name);
+ if (colType == TSDB_DATA_TYPE_VARCHAR) {
+ colTypeLen += sprintf(varDataVal(colTypeStr) + colTypeLen, "(%d)",
+ (int32_t)(pStb->pColumns[i].bytes - VARSTR_HEADER_SIZE));
+ } else if (colType == TSDB_DATA_TYPE_NCHAR) {
+ colTypeLen += sprintf(
+ varDataVal(colTypeStr) + colTypeLen, "(%d)",
+ (int32_t)((pStb->pColumns[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
+ }
+ varDataSetLen(colTypeStr, colTypeLen);
+ colDataAppend(pColInfo, numOfRows, (char*)colTypeStr, false);
+
+ pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
+ colDataAppend(pColInfo, numOfRows, (const char*)&pStb->pColumns[i].bytes, false);
+ while(cols < pShow->numOfColumns) {
+ pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
+ colDataAppendNULL(pColInfo, numOfRows);
+ }
+ numOfRows++;
+ }
+
+ sdbRelease(pSdb, pStb);
+ }
+
+ if (pDb != NULL) {
+ mndReleaseDb(pMnode, pDb);
+ }
+
+ pShow->numOfRows += numOfRows;
+ mDebug("mndRetrieveStbCol success, rows:%d, pShow->numOfRows:%d", numOfRows, pShow->numOfRows);
+
+ return numOfRows;
+}
+
static void mndCancelGetNextStb(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, pIter);
diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c
index 8a435c48872f31a7026a62bf30993214c2918201..6b54a36a6fc3b4135fb19f1bbe40ee13cdecf7c5 100644
--- a/source/dnode/mnode/impl/src/mndStream.c
+++ b/source/dnode/mnode/impl/src/mndStream.c
@@ -314,7 +314,11 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
}
tstrncpy(pObj->targetDb, pTargetDb->name, TSDB_DB_FNAME_LEN);
- pObj->targetStbUid = mndGenerateUid(pObj->targetSTbName, TSDB_TABLE_FNAME_LEN);
+ if (pCreate->createStb == STREAM_CREATE_STABLE_TRUE) {
+ pObj->targetStbUid = mndGenerateUid(pObj->targetSTbName, TSDB_TABLE_FNAME_LEN);
+ } else {
+ pObj->targetStbUid = pCreate->targetStbUid;
+ }
pObj->targetDbUid = pTargetDb->uid;
mndReleaseDb(pMnode, pTargetDb);
@@ -334,6 +338,38 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
goto FAIL;
}
+ int32_t numOfNULL = taosArrayGetSize(pCreate->fillNullCols);
+ if(numOfNULL > 0) {
+ pObj->outputSchema.nCols += numOfNULL;
+ SSchema* pFullSchema = taosMemoryCalloc(pObj->outputSchema.nCols, sizeof(SSchema));
+ if (!pFullSchema) {
+ goto FAIL;
+ }
+
+ int32_t nullIndex = 0;
+ int32_t dataIndex = 0;
+ for (int16_t i = 0; i < pObj->outputSchema.nCols; i++) {
+ SColLocation* pos = taosArrayGet(pCreate->fillNullCols, nullIndex);
+ if (i < pos->slotId) {
+ pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
+ pFullSchema[i].colId = i + 1; // pObj->outputSchema.pSchema[dataIndex].colId;
+ pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags;
+ strcpy(pFullSchema[i].name, pObj->outputSchema.pSchema[dataIndex].name);
+ pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type;
+ dataIndex++;
+ } else {
+ pFullSchema[i].bytes = 0;
+ pFullSchema[i].colId = pos->colId;
+ pFullSchema[i].flags = COL_SET_NULL;
+ memset(pFullSchema[i].name, 0, TSDB_COL_NAME_LEN);
+ pFullSchema[i].type = pos->type;
+ nullIndex++;
+ }
+ }
+ taosMemoryFree(pObj->outputSchema.pSchema);
+ pObj->outputSchema.pSchema = pFullSchema;
+ }
+
SPlanContext cxt = {
.pAstRoot = pAst,
.topicQuery = false,
diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h
index b1093e0f23e15ab5330028f0690d3e2aa124704d..3f3e287bb9f6b450ad9470d9935a76c65cc03e19 100644
--- a/source/dnode/vnode/inc/vnode.h
+++ b/source/dnode/vnode/inc/vnode.h
@@ -152,7 +152,7 @@ typedef struct SMTbCursor SMTbCursor;
SMTbCursor *metaOpenTbCursor(SMeta *pMeta);
void metaCloseTbCursor(SMTbCursor *pTbCur);
-int32_t metaTbCursorNext(SMTbCursor *pTbCur);
+int32_t metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType);
#endif
// tsdb
diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c
index def5dc22b3818d97656b1f51a46951e64051c381..4b280a32f1f5915876dbeb8fd0ff4b6beaa4c658 100644
--- a/source/dnode/vnode/src/meta/metaQuery.c
+++ b/source/dnode/vnode/src/meta/metaQuery.c
@@ -310,7 +310,7 @@ void metaCloseTbCursor(SMTbCursor *pTbCur) {
}
}
-int metaTbCursorNext(SMTbCursor *pTbCur) {
+int metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType) {
int ret;
void *pBuf;
STbCfg tbCfg;
@@ -324,7 +324,7 @@ int metaTbCursorNext(SMTbCursor *pTbCur) {
tDecoderClear(&pTbCur->mr.coder);
metaGetTableEntryByVersion(&pTbCur->mr, ((SUidIdxVal *)pTbCur->pVal)[0].version, *(tb_uid_t *)pTbCur->pKey);
- if (pTbCur->mr.me.type == TSDB_SUPER_TABLE) {
+ if (pTbCur->mr.me.type == jumpTableType) {
continue;
}
diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c
index cc60283c58a1edad3e7de8fe8db869f93e6f3058..f1103ad48a1be4101da041e40ddf1e7b4a6b181b 100644
--- a/source/dnode/vnode/src/tq/tqSink.c
+++ b/source/dnode/vnode/src/tq/tqSink.c
@@ -323,19 +323,10 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
taosArrayDestroy(tagArray);
}
-static int32_t encodeCreateChildTableForRPC(SVCreateTbReq* req, int32_t vgId, void** pBuf, int32_t* contLen) {
+static int32_t encodeCreateChildTableForRPC(SVCreateTbBatchReq* pReqs, int32_t vgId, void** pBuf, int32_t* contLen) {
int32_t ret = 0;
- SVCreateTbBatchReq reqs = {0};
- reqs.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq));
- if (NULL == reqs.pArray) {
- ret = -1;
- goto end;
- }
- taosArrayPush(reqs.pArray, req);
- reqs.nReqs = 1;
-
- tEncodeSize(tEncodeSVCreateTbBatchReq, &reqs, *contLen, ret);
+ tEncodeSize(tEncodeSVCreateTbBatchReq, pReqs, *contLen, ret);
if (ret < 0) {
ret = -1;
goto end;
@@ -350,7 +341,7 @@ static int32_t encodeCreateChildTableForRPC(SVCreateTbReq* req, int32_t vgId, vo
((SMsgHead*)(*pBuf))->contLen = htonl(*contLen);
SEncoder coder = {0};
tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), (*contLen) - sizeof(SMsgHead) );
- if (tEncodeSVCreateTbBatchReq(&coder, &reqs) < 0) {
+ if (tEncodeSVCreateTbBatchReq(&coder, pReqs) < 0) {
rpcFreeCont(*pBuf);
*pBuf = NULL;
*contLen = 0;
@@ -361,14 +352,13 @@ static int32_t encodeCreateChildTableForRPC(SVCreateTbReq* req, int32_t vgId, vo
tEncoderClear(&coder);
end:
- taosArrayDestroy(reqs.pArray);
return ret;
}
-int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbReq* pReq) {
+int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) {
void* buf = NULL;
int32_t tlen = 0;
- encodeCreateChildTableForRPC(pReq, TD_VID(pVnode), &buf, &tlen);
+ encodeCreateChildTableForRPC(pReqs, TD_VID(pVnode), &buf, &tlen);
SRpcMsg msg = {
.msgType = TDMT_VND_CREATE_TABLE,
@@ -387,6 +377,7 @@ _error:
tqError("failed to encode submit req since %s", terrstr());
return TSDB_CODE_OUT_OF_MEMORY;
}
+
void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
const SArray* pBlocks = (const SArray*)data;
SVnode* pVnode = (SVnode*)vnode;
@@ -402,6 +393,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
void* pBuf = NULL;
SArray* tagArray = NULL;
SArray* pVals = NULL;
+ SArray* crTblArray = NULL;
for (int32_t i = 0; i < blockSz; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
@@ -442,8 +434,14 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
tqDebug("failed to put delete req into write-queue since %s", terrstr());
}
} else if (pDataBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
+ SVCreateTbBatchReq reqs = {0};
+ crTblArray = reqs.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq));
+ if (NULL == reqs.pArray) {
+ goto _end;
+ }
for (int32_t rowId = 0; rowId < rows; rowId++) {
- SVCreateTbReq* pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq));
+ SVCreateTbReq createTbReq = {0};
+ SVCreateTbReq* pCreateTbReq = &createTbReq;
if (!pCreateTbReq) {
goto _end;
}
@@ -511,6 +509,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
+
pCreateTbReq->ctb.pTag = (uint8_t*)pTag;
// set table name
@@ -524,13 +523,15 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
pCreateTbReq->name = taosMemoryCalloc(1, varDataLen(pTbData) + 1);
memcpy(pCreateTbReq->name, varDataVal(pTbData), varDataLen(pTbData));
}
-
- if (tqPutReqToQueue(pVnode, pCreateTbReq) != TSDB_CODE_SUCCESS) {
- goto _end;
- }
- tdDestroySVCreateTbReq(pCreateTbReq);
- taosMemoryFreeClear(pCreateTbReq);
+ taosArrayPush(reqs.pArray, pCreateTbReq);
}
+ reqs.nReqs = taosArrayGetSize(reqs.pArray);
+ if (tqPutReqToQueue(pVnode, &reqs) != TSDB_CODE_SUCCESS) {
+ goto _end;
+ }
+ tagArray = taosArrayDestroy(tagArray);
+ taosArrayDestroyEx(crTblArray, (FDelete)tdDestroySVCreateTbReq);
+ crTblArray = NULL;
} else {
SSubmitTbData tbData = {0};
tqDebug("tq sink pipe2, convert block1 %d, rows: %d", i, rows);
@@ -579,7 +580,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
goto _end;
}
STagVal tagVal = {
- .cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1,
+ .cid = pTSchema->numOfCols + 1,
.type = TSDB_DATA_TYPE_UBIGINT,
.i64 = (int64_t)pDataBlock->info.id.groupId,
};
@@ -638,28 +639,37 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
for (int32_t j = 0; j < rows; j++) {
taosArrayClear(pVals);
+ int32_t dataIndex = 0;
for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
const STColumn* pCol = &pTSchema->columns[k];
- SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, k);
if (k == 0) {
+ SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
void* colData = colDataGetData(pColData, j);
tqDebug("tq sink pipe2, row %d, col %d ts %" PRId64, j, k, *(int64_t*)colData);
}
- if (colDataIsNull_s(pColData, j)) {
+ if (IS_SET_NULL(pCol)) {
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
taosArrayPush(pVals, &cv);
- } else {
- void* colData = colDataGetData(pColData, j);
- if (IS_STR_DATA_TYPE(pCol->type)) {
- SValue sv =
- (SValue){.nData = varDataLen(colData), .pData = varDataVal(colData)}; // address copy, no value
- SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv);
+ } else{
+ SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
+ if (colDataIsNull_s(pColData, j)) {
+ SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
taosArrayPush(pVals, &cv);
+ dataIndex++;
} else {
- SValue sv;
- memcpy(&sv.val, colData, tDataTypes[pCol->type].bytes);
- SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv);
- taosArrayPush(pVals, &cv);
+ void* colData = colDataGetData(pColData, j);
+ if (IS_STR_DATA_TYPE(pCol->type)) {
+ SValue sv =
+ (SValue){.nData = varDataLen(colData), .pData = varDataVal(colData)}; // address copy, no value
+ SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv);
+ taosArrayPush(pVals, &cv);
+ } else {
+ SValue sv;
+ memcpy(&sv.val, colData, tDataTypes[pCol->type].bytes);
+ SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv);
+ taosArrayPush(pVals, &cv);
+ }
+ dataIndex++;
}
}
}
@@ -716,5 +726,6 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
_end:
taosArrayDestroy(tagArray);
taosArrayDestroy(pVals);
+ taosArrayDestroyEx(crTblArray, (FDelete)tdDestroySVCreateTbReq);
// TODO: change
}
diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c
index 05570eda2fb3000327b6debe295582caae2cb59b..ddcaaf2c72778d1a8488ee353722bef2e337fb42 100644
--- a/source/libs/executor/src/sysscanoperator.c
+++ b/source/libs/executor/src/sysscanoperator.c
@@ -140,6 +140,10 @@ static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo,
SMetaReader* smrChildTable, const char* dbname, const char* tableName,
int32_t* pNumOfRows, const SSDataBlock* dataBlock);
+static int32_t sysTableUserColsFillOneTableCols(const SSysTableScanInfo* pInfo, const char* dbname,
+ int32_t* pNumOfRows, const SSDataBlock* dataBlock,
+ char* tName, SSchemaWrapper* schemaRow, char* tableType);
+
static void relocateAndFilterSysTagsScanResult(SSysTableScanInfo* pInfo, int32_t numOfRows, SSDataBlock* dataBlock,
SFilterInfo* pFilterInfo);
@@ -413,6 +417,176 @@ static bool sysTableIsCondOnOneTable(SNode* pCond, char* condTable) {
return false;
}
+static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
+ qDebug("sysTableScanUserCols get cols start");
+ SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
+ SSysTableScanInfo* pInfo = pOperator->info;
+ if (pOperator->status == OP_EXEC_DONE) {
+ return NULL;
+ }
+
+ blockDataCleanup(pInfo->pRes);
+ int32_t numOfRows = 0;
+
+ SSDataBlock* dataBlock = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_COLS);
+ blockDataEnsureCapacity(dataBlock, pOperator->resultInfo.capacity);
+
+ const char* db = NULL;
+ int32_t vgId = 0;
+ vnodeGetInfo(pInfo->readHandle.vnode, &db, &vgId);
+
+ SName sn = {0};
+ char dbname[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
+ tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB);
+
+ tNameGetDbName(&sn, varDataVal(dbname));
+ varDataSetLen(dbname, strlen(varDataVal(dbname)));
+
+ // optimize when sql like where table_name='tablename' and xxx.
+ if (pInfo->req.filterTb[0]) {
+ char tableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
+ STR_TO_VARSTR(tableName, pInfo->req.filterTb);
+
+ SMetaReader smrTable = {0};
+ metaReaderInit(&smrTable, pInfo->readHandle.meta, 0);
+ int32_t code = metaGetTableEntryByName(&smrTable, pInfo->req.filterTb);
+ if (code != TSDB_CODE_SUCCESS) {
+ // terrno has been set by metaGetTableEntryByName, therefore, return directly
+ metaReaderClear(&smrTable);
+ blockDataDestroy(dataBlock);
+ pInfo->loadInfo.totalRows = 0;
+ return NULL;
+ }
+
+ if (smrTable.me.type == TSDB_SUPER_TABLE) {
+ metaReaderClear(&smrTable);
+ blockDataDestroy(dataBlock);
+ pInfo->loadInfo.totalRows = 0;
+ return NULL;
+ }
+
+ if (smrTable.me.type == TSDB_CHILD_TABLE) {
+ int64_t suid = smrTable.me.ctbEntry.suid;
+ metaReaderClear(&smrTable);
+ metaReaderInit(&smrTable, pInfo->readHandle.meta, 0);
+ code = metaGetTableEntryByUid(&smrTable, suid);
+ if (code != TSDB_CODE_SUCCESS) {
+ // terrno has been set by metaGetTableEntryByName, therefore, return directly
+ metaReaderClear(&smrTable);
+ blockDataDestroy(dataBlock);
+ pInfo->loadInfo.totalRows = 0;
+ return NULL;
+ }
+ }
+
+ char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
+ SSchemaWrapper *schemaRow = NULL;
+ if(smrTable.me.type == TSDB_SUPER_TABLE){
+ schemaRow = &smrTable.me.stbEntry.schemaRow;
+ STR_TO_VARSTR(typeName, "CHILD_TABLE");
+ }else if(smrTable.me.type == TSDB_NORMAL_TABLE){
+ schemaRow = &smrTable.me.ntbEntry.schemaRow;
+ STR_TO_VARSTR(typeName, "NORMAL_TABLE");
+ }
+
+ sysTableUserColsFillOneTableCols(pInfo, dbname, &numOfRows, dataBlock, tableName, schemaRow, typeName);
+ metaReaderClear(&smrTable);
+
+ if (numOfRows > 0) {
+ relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo);
+ numOfRows = 0;
+ }
+ blockDataDestroy(dataBlock);
+ pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
+ setOperatorCompleted(pOperator);
+ return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
+ }
+
+ int32_t ret = 0;
+ if (pInfo->pCur == NULL) {
+ pInfo->pCur = metaOpenTbCursor(pInfo->readHandle.meta);
+ }
+
+ SHashObj *stableSchema = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
+ taosHashSetFreeFp(stableSchema, tDeleteSSchemaWrapperForHash);
+ while ((ret = metaTbCursorNext(pInfo->pCur, TSDB_TABLE_MAX)) == 0) {
+ char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
+ char tableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
+
+ SSchemaWrapper *schemaRow = NULL;
+
+ if(pInfo->pCur->mr.me.type == TSDB_SUPER_TABLE){
+ qDebug("sysTableScanUserCols cursor get super table");
+ void *schema = taosHashGet(stableSchema, &pInfo->pCur->mr.me.uid, sizeof(int64_t));
+ if(schema == NULL){
+ SSchemaWrapper *schemaWrapper = tCloneSSchemaWrapper(&pInfo->pCur->mr.me.stbEntry.schemaRow);
+ taosHashPut(stableSchema, &pInfo->pCur->mr.me.uid, sizeof(int64_t), &schemaWrapper, POINTER_BYTES);
+ }
+ continue;
+ }else if (pInfo->pCur->mr.me.type == TSDB_CHILD_TABLE) {
+ qDebug("sysTableScanUserCols cursor get child table");
+ STR_TO_VARSTR(typeName, "CHILD_TABLE");
+ STR_TO_VARSTR(tableName, pInfo->pCur->mr.me.name);
+
+ int64_t suid = pInfo->pCur->mr.me.ctbEntry.suid;
+ void *schema = taosHashGet(stableSchema, &pInfo->pCur->mr.me.ctbEntry.suid, sizeof(int64_t));
+ if(schema != NULL){
+ schemaRow = *(SSchemaWrapper **)schema;
+ }else{
+ tDecoderClear(&pInfo->pCur->mr.coder);
+ int code = metaGetTableEntryByUid(&pInfo->pCur->mr, suid);
+ if (code != TSDB_CODE_SUCCESS) {
+ // terrno has been set by metaGetTableEntryByName, therefore, return directly
+ qError("sysTableScanUserCols get meta by suid:%"PRId64 " error, code:%d", suid, code);
+ blockDataDestroy(dataBlock);
+ pInfo->loadInfo.totalRows = 0;
+ taosHashCleanup(stableSchema);
+ return NULL;
+ }
+ schemaRow = &pInfo->pCur->mr.me.stbEntry.schemaRow;
+ }
+ }else if(pInfo->pCur->mr.me.type == TSDB_NORMAL_TABLE){
+ qDebug("sysTableScanUserCols cursor get normal table");
+ schemaRow = &pInfo->pCur->mr.me.ntbEntry.schemaRow;
+ STR_TO_VARSTR(typeName, "NORMAL_TABLE");
+ STR_TO_VARSTR(tableName, pInfo->pCur->mr.me.name);
+ }else{
+ qDebug("sysTableScanUserCols cursor get invalid table");
+ continue;
+ }
+
+ sysTableUserColsFillOneTableCols(pInfo, dbname, &numOfRows, dataBlock, tableName, schemaRow, typeName);
+
+ if (numOfRows >= pOperator->resultInfo.capacity) {
+ relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo);
+ numOfRows = 0;
+
+ if (pInfo->pRes->info.rows > 0) {
+ break;
+ }
+ }
+ }
+
+ taosHashCleanup(stableSchema);
+
+ if (numOfRows > 0) {
+ relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo);
+ numOfRows = 0;
+ }
+
+ blockDataDestroy(dataBlock);
+ if (ret != 0) {
+ metaCloseTbCursor(pInfo->pCur);
+ pInfo->pCur = NULL;
+ setOperatorCompleted(pOperator);
+ }
+
+ pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
+ qDebug("sysTableScanUserCols get cols success, rows:%" PRIu64, pInfo->loadInfo.totalRows);
+
+ return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
+}
+
static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSysTableScanInfo* pInfo = pOperator->info;
@@ -491,7 +665,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
pInfo->pCur = metaOpenTbCursor(pInfo->readHandle.meta);
}
- while ((ret = metaTbCursorNext(pInfo->pCur)) == 0) {
+ while ((ret = metaTbCursorNext(pInfo->pCur, TSDB_SUPER_TABLE)) == 0) {
if (pInfo->pCur->mr.me.type != TSDB_CHILD_TABLE) {
continue;
}
@@ -728,6 +902,67 @@ static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo,
return TSDB_CODE_SUCCESS;
}
+static int32_t sysTableUserColsFillOneTableCols(const SSysTableScanInfo* pInfo, const char* dbname,
+ int32_t* pNumOfRows, const SSDataBlock* dataBlock, char* tName,
+ SSchemaWrapper* schemaRow, char* tableType) {
+ if(schemaRow == NULL){
+ qError("sysTableUserColsFillOneTableCols schemaRow is NULL");
+ return TSDB_CODE_SUCCESS;
+ }
+ int32_t numOfRows = *pNumOfRows;
+
+ int32_t numOfCols = schemaRow->nCols;
+ for (int32_t i = 0; i < numOfCols; ++i) {
+ SColumnInfoData* pColInfoData = NULL;
+
+ // table name
+ pColInfoData = taosArrayGet(dataBlock->pDataBlock, 0);
+ colDataAppend(pColInfoData, numOfRows, tName, false);
+
+ // database name
+ pColInfoData = taosArrayGet(dataBlock->pDataBlock, 1);
+ colDataAppend(pColInfoData, numOfRows, dbname, false);
+
+ pColInfoData = taosArrayGet(dataBlock->pDataBlock, 2);
+ colDataAppend(pColInfoData, numOfRows, tableType, false);
+
+ // col name
+ char colName[TSDB_COL_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
+ STR_TO_VARSTR(colName, schemaRow->pSchema[i].name);
+ pColInfoData = taosArrayGet(dataBlock->pDataBlock, 3);
+ colDataAppend(pColInfoData, numOfRows, colName, false);
+
+ // col type
+ int8_t colType = schemaRow->pSchema[i].type;
+ pColInfoData = taosArrayGet(dataBlock->pDataBlock, 4);
+ char colTypeStr[VARSTR_HEADER_SIZE + 32];
+ int colTypeLen = sprintf(varDataVal(colTypeStr), "%s", tDataTypes[colType].name);
+ if (colType == TSDB_DATA_TYPE_VARCHAR) {
+ colTypeLen += sprintf(varDataVal(colTypeStr) + colTypeLen, "(%d)",
+ (int32_t)(schemaRow->pSchema[i].bytes - VARSTR_HEADER_SIZE));
+ } else if (colType == TSDB_DATA_TYPE_NCHAR) {
+ colTypeLen += sprintf(
+ varDataVal(colTypeStr) + colTypeLen, "(%d)",
+ (int32_t)((schemaRow->pSchema[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
+ }
+ varDataSetLen(colTypeStr, colTypeLen);
+ colDataAppend(pColInfoData, numOfRows, (char*)colTypeStr, false);
+
+ pColInfoData = taosArrayGet(dataBlock->pDataBlock, 5);
+ colDataAppend(pColInfoData, numOfRows, (const char*)&schemaRow->pSchema[i].bytes, false);
+
+ for (int32_t j = 6; j <= 8; ++j) {
+ pColInfoData = taosArrayGet(dataBlock->pDataBlock, j);
+ colDataAppendNULL(pColInfoData, numOfRows);
+ }
+ ++numOfRows;
+ }
+
+ *pNumOfRows = numOfRows;
+
+ return TSDB_CODE_SUCCESS;
+}
+
static SSDataBlock* buildInfoSchemaTableMetaBlock(char* tableName) {
size_t size = 0;
const SSysTableMeta* pMeta = NULL;
@@ -1029,7 +1264,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
char n[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
int32_t ret = 0;
- while ((ret = metaTbCursorNext(pInfo->pCur)) == 0) {
+ while ((ret = metaTbCursorNext(pInfo->pCur, TSDB_SUPER_TABLE)) == 0) {
STR_TO_VARSTR(n, pInfo->pCur->mr.me.name);
// table name
@@ -1315,12 +1550,19 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
if (pInfo->showRewrite) {
getDBNameFromCondition(pInfo->pCondition, dbName);
sprintf(pInfo->req.db, "%d.%s", pInfo->accountId, dbName);
+ }else if(strncasecmp(name, TSDB_INS_TABLE_COLS, TSDB_TABLE_FNAME_LEN) == 0){
+ getDBNameFromCondition(pInfo->pCondition, dbName);
+ if(dbName[0]) sprintf(pInfo->req.db, "%d.%s", pInfo->accountId, dbName);
+ sysTableIsCondOnOneTable(pInfo->pCondition, pInfo->req.filterTb);
}
+
SSDataBlock* pBlock = NULL;
if (strncasecmp(name, TSDB_INS_TABLE_TABLES, TSDB_TABLE_FNAME_LEN) == 0) {
pBlock = sysTableScanUserTables(pOperator);
} else if (strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0) {
pBlock = sysTableScanUserTags(pOperator);
+ } else if (strncasecmp(name, TSDB_INS_TABLE_COLS, TSDB_TABLE_FNAME_LEN) == 0 && pInfo->readHandle.mnd == NULL) {
+ pBlock = sysTableScanUserCols(pOperator);
} else if (strncasecmp(name, TSDB_INS_TABLE_STABLES, TSDB_TABLE_FNAME_LEN) == 0 && pInfo->showRewrite &&
IS_SYS_DBNAME(dbName)) {
pBlock = sysTableScanUserSTables(pOperator);
@@ -1391,7 +1633,7 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca
tsem_wait(&pInfo->ready);
if (pTaskInfo->code) {
- qDebug("%s load meta data from mnode failed, totalRows:%" PRIu64 ", code:%s", GET_TASKID(pTaskInfo),
+ qError("%s load meta data from mnode failed, totalRows:%" PRIu64 ", code:%s", GET_TASKID(pTaskInfo),
pInfo->loadInfo.totalRows, tstrerror(pTaskInfo->code));
return NULL;
}
@@ -1427,6 +1669,7 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode,
const char* pUser, SExecTaskInfo* pTaskInfo) {
+ int32_t code = TDB_CODE_SUCCESS;
SSysTableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SSysTableScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
@@ -1437,7 +1680,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
int32_t num = 0;
- int32_t code = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
+ code = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
@@ -1513,7 +1756,8 @@ void destroySysScanOperator(void* param) {
const char* name = tNameGetTableName(&pInfo->name);
if (strncasecmp(name, TSDB_INS_TABLE_TABLES, TSDB_TABLE_FNAME_LEN) == 0 ||
- strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0 || pInfo->pCur != NULL) {
+ strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0 ||
+ strncasecmp(name, TSDB_INS_TABLE_COLS, TSDB_TABLE_FNAME_LEN) == 0|| pInfo->pCur != NULL) {
metaCloseTbCursor(pInfo->pCur);
pInfo->pCur = NULL;
}
diff --git a/source/libs/parser/src/parAstParser.c b/source/libs/parser/src/parAstParser.c
index fae62626fa3666909208ce9220215baea8be3810..126027c78f0629cf5490cce490cc819fa4e2984f 100644
--- a/source/libs/parser/src/parAstParser.c
+++ b/source/libs/parser/src/parAstParser.c
@@ -166,7 +166,7 @@ static int32_t collectMetaKeyFromRealTableImpl(SCollectMetaKeyCxt* pCxt, const c
code = reserveDnodeRequiredInCache(pCxt->pMetaCache);
}
if (TSDB_CODE_SUCCESS == code &&
- (0 == strcmp(pTable, TSDB_INS_TABLE_TAGS) || 0 == strcmp(pTable, TSDB_INS_TABLE_TABLES)) &&
+ (0 == strcmp(pTable, TSDB_INS_TABLE_TAGS) || 0 == strcmp(pTable, TSDB_INS_TABLE_TABLES) || 0 == strcmp(pTable, TSDB_INS_TABLE_COLS)) &&
QUERY_NODE_SELECT_STMT == nodeType(pCxt->pStmt)) {
code = collectMetaKeyFromInsTags(pCxt);
}
diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c
index 8d1660be723386f03169f6c67869c820381436af..a9c6d42cd456ecfc1f832e1e61b9e72c1d73f25e 100644
--- a/source/libs/parser/src/parTranslater.c
+++ b/source/libs/parser/src/parTranslater.c
@@ -2210,7 +2210,7 @@ static int32_t dnodeToVgroupsInfo(SArray* pDnodes, SVgroupsInfo** pVgsInfo) {
}
static bool sysTableFromVnode(const char* pTable) {
- return ((0 == strcmp(pTable, TSDB_INS_TABLE_TABLES)) || (0 == strcmp(pTable, TSDB_INS_TABLE_TAGS)));
+ return ((0 == strcmp(pTable, TSDB_INS_TABLE_TABLES)) || (0 == strcmp(pTable, TSDB_INS_TABLE_TAGS)) || (0 == strcmp(pTable, TSDB_INS_TABLE_COLS)));
}
static bool sysTableFromDnode(const char* pTable) { return 0 == strcmp(pTable, TSDB_INS_TABLE_DNODE_VARIABLES); }
@@ -2278,7 +2278,8 @@ static int32_t setVnodeSysTableVgroupList(STranslateContext* pCxt, SName* pName,
((SSelectStmt*)pCxt->pCurrStmt)->isEmptyResult = true;
}
- if (TSDB_CODE_SUCCESS == code && 0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TABLES) && !hasUserDbCond) {
+ if (TSDB_CODE_SUCCESS == code && (0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TABLES) && !hasUserDbCond) ||
+ 0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_COLS)) {
code = addMnodeToVgroupList(&pCxt->pParseCxt->mgmtEpSet, &pVgs);
}
@@ -2376,7 +2377,8 @@ static bool isSingleTable(SRealTableNode* pRealTable) {
int8_t tableType = pRealTable->pMeta->tableType;
if (TSDB_SYSTEM_TABLE == tableType) {
return 0 != strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TABLES) &&
- 0 != strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TAGS);
+ 0 != strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TAGS) &&
+ 0 != strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_COLS);
}
return (TSDB_CHILD_TABLE == tableType || TSDB_NORMAL_TABLE == tableType);
}
@@ -5740,7 +5742,7 @@ static int32_t adjustDataTypeOfProjections(STranslateContext* pCxt, const STable
int32_t index = 0;
SNode* pProj = NULL;
FOREACH(pProj, pProjections) {
- SSchema* pSchema = pSchemas + index;
+ SSchema* pSchema = pSchemas + index++;
SDataType dt = {.type = pSchema->type, .bytes = pSchema->bytes};
if (!dataTypeEqual(&dt, &((SExprNode*)pProj)->resType)) {
SNode* pFunc = NULL;
@@ -5761,7 +5763,7 @@ typedef struct SProjColPos {
} SProjColPos;
static int32_t projColPosCompar(const void* l, const void* r) {
- return ((SProjColPos*)l)->colId < ((SProjColPos*)r)->colId;
+ return ((SProjColPos*)l)->colId > ((SProjColPos*)r)->colId;
}
static void projColPosDelete(void* p) { taosMemoryFree(((SProjColPos*)p)->pProj); }
@@ -5856,7 +5858,11 @@ static int32_t adjustStreamQueryForExistTable(STranslateContext* pCxt, SCreateSt
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TABLE_NOT_EXIST, pStmt->targetTabName);
}
pReq->createStb = STREAM_CREATE_STABLE_TRUE;
+ pReq->targetStbUid = 0;
return TSDB_CODE_SUCCESS;
+ } else {
+ pReq->createStb = STREAM_CREATE_STABLE_FALSE;
+ pReq->targetStbUid = pMeta->suid;
}
if (TSDB_CODE_SUCCESS == code) {
code = adjustStreamQueryForExistTableImpl(pCxt, pStmt, pMeta);
diff --git a/source/libs/parser/test/mockCatalog.cpp b/source/libs/parser/test/mockCatalog.cpp
index ae702ec02f1570ab9d92976abdb18059cd727175..c3f6c3ac72c70b799500b1e70cc31b72e585d575 100644
--- a/source/libs/parser/test/mockCatalog.cpp
+++ b/source/libs/parser/test/mockCatalog.cpp
@@ -102,6 +102,10 @@ void generateInformationSchema(MockCatalogService* mcs) {
.addColumn("table_name", TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN)
.addColumn("db_name", TSDB_DATA_TYPE_BINARY, TSDB_DB_NAME_LEN)
.done();
+ mcs->createTableBuilder(TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_COLS, TSDB_SYSTEM_TABLE, 2)
+ .addColumn("table_name", TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN)
+ .addColumn("db_name", TSDB_DATA_TYPE_BINARY, TSDB_DB_NAME_LEN)
+ .done();
mcs->createTableBuilder(TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_USER_PRIVILEGES, TSDB_SYSTEM_TABLE, 2)
.addColumn("user_name", TSDB_DATA_TYPE_BINARY, TSDB_USER_LEN)
.addColumn("privilege", TSDB_DATA_TYPE_BINARY, 10)
diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c
index 120a8537417562a4002268ba4f897a3811b0eaa4..b47f0bc043644a66dfb07cc37d0b48aea940efbc 100644
--- a/source/libs/planner/src/planOptimizer.c
+++ b/source/libs/planner/src/planOptimizer.c
@@ -1334,11 +1334,12 @@ static int32_t smaIndexOptApplyIndex(SLogicSubplan* pLogicSubplan, SScanLogicNod
if (TSDB_CODE_SUCCESS == code) {
code = replaceLogicNode(pLogicSubplan, pScan->node.pParent, pSmaScan);
}
+ if (TSDB_CODE_SUCCESS == code) {
+ nodesDestroyNode((SNode*)pScan->node.pParent);
+ }
return code;
}
-static void smaIndexOptDestroySmaIndex(void* p) { taosMemoryFree(((STableIndexInfo*)p)->expr); }
-
static int32_t smaIndexOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SScanLogicNode* pScan) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t nindexes = taosArrayGetSize(pScan->pSmaIndexes);
@@ -1348,8 +1349,6 @@ static int32_t smaIndexOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogi
code = smaIndexOptCouldApplyIndex(pScan, pIndex, &pSmaCols);
if (TSDB_CODE_SUCCESS == code && NULL != pSmaCols) {
code = smaIndexOptApplyIndex(pLogicSubplan, pScan, pIndex, pSmaCols);
- taosArrayDestroyEx(pScan->pSmaIndexes, smaIndexOptDestroySmaIndex);
- pScan->pSmaIndexes = NULL;
pCxt->optimized = true;
break;
}
diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c
index 7d6238193d7e6411cffd9bd8b092afa01e651b7a..f83704be8783ac0f65c02823a3ace1b1031685b7 100644
--- a/source/libs/planner/src/planPhysiCreater.c
+++ b/source/libs/planner/src/planPhysiCreater.c
@@ -609,7 +609,8 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan*
pScan->accountId = pCxt->pPlanCxt->acctId;
pScan->sysInfo = pCxt->pPlanCxt->sysInfo;
if (0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_TABLES) ||
- 0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_TAGS)) {
+ 0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_TAGS) ||
+ 0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_COLS)) {
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
} else {
pSubplan->execNode.nodeId = MNODE_HANDLE;
diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task
index 00bad876a10891fa30c2c2ef002ddcfea6105d11..d77f6f170344fc3a5ac67c05250889eeb8da4cda 100644
--- a/tests/parallel_test/cases.task
+++ b/tests/parallel_test/cases.task
@@ -247,8 +247,8 @@
,,y,script,./test.sh -f tsim/stream/fillIntervalPartitionBy.sim
,,y,script,./test.sh -f tsim/stream/fillIntervalPrevNext.sim
,,y,script,./test.sh -f tsim/stream/fillIntervalValue.sim
-,,y,script,./test.sh -f tsim/stream/tableAndTag0.sim
-,,y,script,./test.sh -f tsim/stream/tableAndTag1.sim
+,,y,script,./test.sh -f tsim/stream/udTableAndTag0.sim
+,,y,script,./test.sh -f tsim/stream/udTableAndTag1.sim
,,y,script,./test.sh -f tsim/trans/lossdata1.sim
,,y,script,./test.sh -f tsim/trans/create_db.sim
,,y,script,./test.sh -f tsim/tmq/basic1.sim
@@ -1053,6 +1053,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/out_of_order.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/out_of_order.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/blockSMA.py -Q 4
+,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/odbc.py
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-21561.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-20582.py
diff --git a/tests/script/tsim/query/sys_tbname.sim b/tests/script/tsim/query/sys_tbname.sim
index 045e908a578af1a1a1248574a4dd70c2af5cf8b2..9b16d982026b1648da938d9f74b4c0ee52f989c4 100644
--- a/tests/script/tsim/query/sys_tbname.sim
+++ b/tests/script/tsim/query/sys_tbname.sim
@@ -53,7 +53,7 @@ endi
sql select tbname from information_schema.ins_tables;
print $rows $data00
-if $rows != 32 then
+if $rows != 33 then
return -1
endi
if $data00 != @ins_tables@ then
diff --git a/tests/script/tsim/stream/checkStreamSTable.sim b/tests/script/tsim/stream/checkStreamSTable.sim
new file mode 100644
index 0000000000000000000000000000000000000000..2ed6958196e7defc114378f06f889d5b00b032a1
--- /dev/null
+++ b/tests/script/tsim/stream/checkStreamSTable.sim
@@ -0,0 +1,310 @@
+system sh/stop_dnodes.sh
+system sh/deploy.sh -n dnode1 -i 1
+
+print ===== step1
+
+system sh/exec.sh -n dnode1 -s start
+sleep 50
+sql connect
+
+print ===== step2
+
+sql create database result vgroups 1;
+
+sql create database test vgroups 4;
+sql use test;
+
+
+sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
+sql create table t1 using st tags(1,1,1);
+sql create table t2 using st tags(2,2,2);
+
+sql create stable result.streamt0(ts timestamp,a int,b int) tags(ta int,tb int,tc int);
+
+sql create stream streams0 trigger at_once into result.streamt0 as select _wstart, count(*) c1, max(a) c2 from st partition by tbname interval(10s);
+sql insert into t1 values(1648791213000,1,2,3);
+sql insert into t2 values(1648791213000,2,2,3);
+
+$loop_count = 0
+
+sql select _wstart, count(*) c1, max(a) c2 from st partition by tbname interval(10s);
+print $data00, $data01, $data02
+print $data10, $data11, $data12
+print $data20, $data21, $data22
+
+loop0:
+
+sleep 300
+
+$loop_count = $loop_count + 1
+if $loop_count == 10 then
+ return -1
+endi
+
+sql select * from result.streamt0 order by ta;
+
+if $rows != 2 then
+ print =====rows=$rows
+ print $data00, $data01, $data02
+ print $data10, $data11, $data12
+ print $data20, $data21, $data22
+ goto loop0
+endi
+
+if $data01 != 1 then
+ print =====data01=$data01
+ goto loop0
+endi
+
+if $data02 != 1 then
+ print =====data02=$data02
+ goto loop0
+endi
+
+if $data11 != 1 then
+ print =====data11=$data11
+ goto loop0
+endi
+
+if $data12 != 2 then
+ print =====data12=$data12
+ goto loop0
+endi
+
+print ===== step3
+
+sql create database result1 vgroups 1;
+
+sql create database test1 vgroups 4;
+sql use test1;
+
+
+sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
+sql create table t1 using st tags(1,1,1);
+sql create table t2 using st tags(2,2,2);
+
+sql create stable result1.streamt1(ts timestamp,a int,b int,c int) tags(ta bigint unsigned,tb int,tc int);
+
+sql create stream streams1 trigger at_once into result1.streamt1(ts,c,a,b) as select _wstart, count(*) c1, max(a),min(b) c2 from st partition by tbname interval(10s);
+sql insert into t1 values(1648791213000,10,20,30);
+sql insert into t2 values(1648791213000,40,50,60);
+
+$loop_count = 0
+
+sql select _wstart, count(*) c1, max(a),min(b) c2 from st partition by tbname interval(10s);
+print $data00, $data01, $data02, $data03
+print $data10, $data11, $data12, $data13
+print $data20, $data21, $data22, $data23
+
+loop1:
+
+sleep 300
+
+$loop_count = $loop_count + 1
+if $loop_count == 10 then
+ return -1
+endi
+
+sql select * from result1.streamt1 order by ta;
+
+if $rows != 2 then
+ print =====rows=$rows
+ print $data00, $data01, $data02, $data03
+ print $data10, $data11, $data12, $data13
+ print $data20, $data21, $data22, $data23
+ goto loop1
+endi
+
+if $data01 != 10 then
+ print =====data01=$data01
+ goto loop1
+endi
+
+if $data02 != 20 then
+ print =====data02=$data02
+ goto loop1
+endi
+
+if $data03 != 1 then
+ print =====data03=$data03
+ goto loop1
+endi
+
+if $data11 != 40 then
+ print =====data11=$data11
+ goto loop1
+endi
+
+if $data12 != 50 then
+ print =====data12=$data12
+ goto loop1
+endi
+
+if $data13 != 1 then
+ print =====data13=$data13
+ goto loop1
+endi
+
+
+print ===== step4
+
+sql create database result2 vgroups 1;
+
+sql create database test2 vgroups 4;
+sql use test2;
+
+
+sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
+sql create table t1 using st tags(1,1,1);
+sql create table t2 using st tags(2,2,2);
+
+sql create stable result2.streamt2(ts timestamp, a int , b int) tags(ta varchar(20));
+
+# tag dest 1, source 2
+##sql_error create stream streams2 trigger at_once into result2.streamt2 TAGS(aa varchar(100), ta int) as select _wstart, count(*) c1, max(a) from st partition by tbname as aa, ta interval(10s);
+
+# column dest 3, source 4
+sql_error create stream streams2 trigger at_once into result2.streamt2 as select _wstart, count(*) c1, max(a), max(b) from st partition by tbname interval(10s);
+
+# column dest 3, source 4
+sql_error create stream streams2 trigger at_once into result2.streamt2(ts, a, b) as select _wstart, count(*) c1, max(a), max(b) from st partition by tbname interval(10s);
+
+# column dest 3, source 2
+sql_error create stream streams2 trigger at_once into result2.streamt2 as select _wstart, count(*) c1 from st partition by tbname interval(10s);
+
+# column dest 3, source 2
+sql create stream streams2 trigger at_once into result2.streamt2(ts, a) as select _wstart, count(*) c1 from st partition by tbname interval(10s);
+
+
+print ===== step5
+
+sql create database result3 vgroups 1;
+
+sql create database test3 vgroups 4;
+sql use test3;
+
+
+sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
+sql create table t1 using st tags(1,2,3);
+sql create table t2 using st tags(4,5,6);
+
+sql create stable result3.streamt3(ts timestamp,a int,b int,c int, d int) tags(ta int,tb int,tc int);
+
+sql create stream streams3 trigger at_once into result3.streamt3(ts,c,a,b) as select _wstart, count(*) c1, max(a),min(b) c2 from st interval(10s);
+
+sql insert into t1 values(1648791213000,10,20,30);
+sql insert into t2 values(1648791213000,40,50,60);
+
+$loop_count = 0
+
+sql select _wstart, count(*) c1, max(a),min(b) c2 from st interval(10s);
+print $data00, $data01, $data02, $data03, $data04
+print $data10, $data11, $data12, $data13, $data14
+print $data20, $data21, $data22, $data23, $data24
+
+loop2:
+
+sleep 300
+
+$loop_count = $loop_count + 1
+if $loop_count == 10 then
+ return -1
+endi
+
+sql select * from result3.streamt3;
+
+if $rows != 1 then
+ print =====rows=$rows
+ print $data00, $data01, $data02, $data03
+ print $data10, $data11, $data12, $data13
+ print $data20, $data21, $data22, $data23
+ goto loop2
+endi
+
+if $data01 != 40 then
+ print =====data01=$data01
+ goto loop2
+endi
+
+if $data02 != 20 then
+ print =====data02=$data02
+ goto loop2
+endi
+
+if $data03 != 2 then
+ print =====data03=$data03
+ goto loop2
+endi
+
+if $data04 != NULL then
+ print =====data04=$data04
+ goto loop2
+endi
+
+print ===== step6
+
+sql create database result4 vgroups 1;
+
+sql create database test4 vgroups 4;
+sql use test4;
+
+sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
+sql create table t1 using st tags(1,2,3);
+sql create table t2 using st tags(4,5,6);
+
+sql create stable result4.streamt4(ts timestamp,a int,b int,c int, d int) tags(ta int,tb int,tc int);
+
+sql create stream streams4 trigger at_once into result4.streamt4(ts,c,a,b) tags(tg2 int, tg3 varchar(100), tg1 bigint) subtable(concat("tbl-", tg1)) as select _wstart, count(*) c1, max(a),min(b) c2 from st partition by ta+1 as tg1, cast(tb as bigint) as tg2, tc as tg3 interval(10s);
+
+sql insert into t1 values(1648791213000,10,20,30);
+sql insert into t2 values(1648791213000,40,50,60);
+
+$loop_count = 0
+
+sql select _wstart, count(*) c1, max(a),min(b) c2 from st interval(10s);
+print $data00, $data01, $data02, $data03
+print $data10, $data11, $data12, $data13
+print $data20, $data21, $data22, $data23
+
+loop2:
+
+sleep 300
+
+$loop_count = $loop_count + 1
+if $loop_count == 10 then
+ return -1
+endi
+
+sql select * from result4.streamt4;
+
+if $rows != 2 then
+ print =====rows=$rows
+ print $data00, $data01, $data02, $data03
+ print $data10, $data11, $data12, $data13
+ print $data20, $data21, $data22, $data23
+ goto loop2
+endi
+
+if $data01 != 40 then
+ print =====data01=$data01
+ goto loop2
+endi
+
+if $data02 != 20 then
+ print =====data02=$data02
+ goto loop2
+endi
+
+if $data03 != 2 then
+ print =====data03=$data03
+ goto loop2
+endi
+
+if $data04 != NULL then
+ print =====data04=$data04
+ goto loop2
+endi
+
+print ======over
+
+system sh/stop_dnodes.sh
diff --git a/tests/script/tsim/stream/tableAndTag0.sim b/tests/script/tsim/stream/udTableAndTag0.sim
similarity index 73%
rename from tests/script/tsim/stream/tableAndTag0.sim
rename to tests/script/tsim/stream/udTableAndTag0.sim
index 5e02171bee17bbe23796c9308e546763200997d1..86feca19188b6fa1034830755de809cef9944490 100644
--- a/tests/script/tsim/stream/tableAndTag0.sim
+++ b/tests/script/tsim/stream/udTableAndTag0.sim
@@ -268,6 +268,105 @@ if $data10 != tbn-t2 then
endi
+print ===== step5
+print ===== tag name + table name
+
+sql create database result4 vgroups 1;
+
+sql create database test4 vgroups 4;
+sql use test4;
+
+
+sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
+sql create table t1 using st tags(1,1,1);
+sql create table t2 using st tags(2,2,2);
+sql create table t3 using st tags(3,3,3);
+
+sql create stream streams4 trigger at_once into result4.streamt4 TAGS(dd varchar(100)) SUBTABLE(concat("tbn-", tbname)) as select _wstart, count(*) c1 from st partition by concat("tag-", tbname) as dd, tbname interval(10s);
+sql insert into t1 values(1648791213000,1,1,1) t2 values(1648791213000,2,2,2) t3 values(1648791213000,3,3,3);
+
+
+$loop_count = 0
+loop7:
+
+sleep 300
+
+$loop_count = $loop_count + 1
+if $loop_count == 10 then
+ return -1
+endi
+
+sql select table_name from information_schema.ins_tables where db_name="result4" order by 1;
+
+if $rows != 3 then
+ print =====rows=$rows
+ print $data00 $data10
+ goto loop7
+endi
+
+if $data00 != tbn-t1 then
+ print =====data00=$data00
+ goto loop7
+endi
+
+if $data10 != tbn-t2 then
+ print =====data10=$data10
+ goto loop7
+endi
+
+if $data20 != tbn-t3 then
+ print =====data20=$data20
+ goto loop7
+endi
+
+$loop_count = 0
+loop8:
+
+sleep 300
+
+$loop_count = $loop_count + 1
+if $loop_count == 10 then
+ return -1
+endi
+
+sql select * from result4.streamt4 order by 3;
+
+if $rows != 3 then
+ print =====rows=$rows
+ print $data00 $data10
+ goto loop8
+endi
+
+if $data01 != 1 then
+ print =====data01=$data01
+ goto loop8
+endi
+
+if $data02 != tag-t1 then
+ print =====data02=$data02
+ goto loop8
+endi
+
+if $data11 != 1 then
+ print =====data11=$data11
+ goto loop8
+endi
+
+if $data12 != tag-t2 then
+ print =====data12=$data12
+ goto loop8
+endi
+
+if $data21 != 1 then
+ print =====data21=$data21
+ goto loop8
+endi
+
+if $data22 != tag-t3 then
+ print =====data22=$data22
+ goto loop8
+endi
+
print ======over
system sh/stop_dnodes.sh
diff --git a/tests/script/tsim/stream/tableAndTag1.sim b/tests/script/tsim/stream/udTableAndTag1.sim
similarity index 74%
rename from tests/script/tsim/stream/tableAndTag1.sim
rename to tests/script/tsim/stream/udTableAndTag1.sim
index 74f67c1fb31e553b964a9a299ab65723374e8271..a0393a03cdd6930ad767a797f4275890ed160fb4 100644
--- a/tests/script/tsim/stream/tableAndTag1.sim
+++ b/tests/script/tsim/stream/udTableAndTag1.sim
@@ -269,6 +269,104 @@ if $data10 != tbn-2 then
goto loop6
endi
+print ===== step5
+print ===== tag name + table name
+
+sql create database result4 vgroups 1;
+
+sql create database test4 vgroups 4;
+sql use test4;
+
+
+sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
+sql create table t1 using st tags(1,1,1);
+sql create table t2 using st tags(2,2,2);
+sql create table t3 using st tags(3,3,3);
+
+sql create stream streams4 trigger at_once into result4.streamt4 TAGS(dd varchar(100)) SUBTABLE(concat("tbn-", dd)) as select _wstart, count(*) c1 from st partition by concat("t", cast(a as varchar(10) ) ) as dd interval(10s);
+sql insert into t1 values(1648791213000,1,1,1) t2 values(1648791213000,2,2,2) t3 values(1648791213000,3,3,3);
+
+
+$loop_count = 0
+loop7:
+
+sleep 300
+
+$loop_count = $loop_count + 1
+if $loop_count == 10 then
+ return -1
+endi
+
+sql select table_name from information_schema.ins_tables where db_name="result4" order by 1;
+
+if $rows != 3 then
+ print =====rows=$rows
+ print $data00 $data10
+ goto loop7
+endi
+
+if $data00 != tbn-t1 then
+ print =====data00=$data00
+ goto loop7
+endi
+
+if $data10 != tbn-t2 then
+ print =====data10=$data10
+ goto loop7
+endi
+
+if $data20 != tbn-t3 then
+ print =====data20=$data20
+ goto loop7
+endi
+
+$loop_count = 0
+loop8:
+
+sleep 300
+
+$loop_count = $loop_count + 1
+if $loop_count == 10 then
+ return -1
+endi
+
+sql select * from result4.streamt4 order by 3;
+
+if $rows != 3 then
+ print =====rows=$rows
+ print $data00 $data10
+ goto loop8
+endi
+
+if $data01 != 1 then
+ print =====data01=$data01
+ goto loop8
+endi
+
+if $data02 != t1 then
+ print =====data02=$data02
+ goto loop8
+endi
+
+if $data11 != 1 then
+ print =====data11=$data11
+ goto loop8
+endi
+
+if $data12 != t2 then
+ print =====data12=$data12
+ goto loop8
+endi
+
+if $data21 != 1 then
+ print =====data21=$data21
+ goto loop8
+endi
+
+if $data22 != t3 then
+ print =====data22=$data22
+ goto loop8
+endi
print ======over
diff --git a/tests/system-test/2-query/odbc.py b/tests/system-test/2-query/odbc.py
new file mode 100644
index 0000000000000000000000000000000000000000..09000fb3d2a191566e16fe39e5c9da7b081080e5
--- /dev/null
+++ b/tests/system-test/2-query/odbc.py
@@ -0,0 +1,76 @@
+import taos
+import sys
+import datetime
+import inspect
+
+from util.log import *
+from util.sql import *
+from util.cases import *
+from util.common import tdCom
+
+class TDTestCase:
+
+ def init(self, conn, logSql, replicaVar=1):
+ self.replicaVar = int(replicaVar)
+ tdLog.debug(f"start to excute {__file__}")
+ tdSql.init(conn.cursor(), False)
+
+ def check_ins_cols(self):
+ tdSql.execute("create database if not exists db")
+ tdSql.execute("create table db.ntb (ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint, c6 tinyint unsigned, c7 smallint unsigned, c8 int unsigned, c9 bigint unsigned, c10 float, c11 double, c12 varchar(100), c13 nchar(100))")
+ tdSql.execute("create table db.stb (ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint, c6 tinyint unsigned, c7 smallint unsigned, c8 int unsigned, c9 bigint unsigned, c10 float, c11 double, c12 varchar(100), c13 nchar(100)) tags(t int)")
+ tdSql.execute("insert into db.ctb using db.stb tags(1) (ts, c1) values (now, 1)")
+
+ tdSql.query("select count(*) from information_schema.ins_columns")
+ tdSql.checkData(0, 0, 265)
+
+ tdSql.query("select * from information_schema.ins_columns where table_name = 'ntb'")
+ tdSql.checkRows(14)
+ tdSql.checkData(0, 2, "NORMAL_TABLE")
+
+
+ tdSql.query("select * from information_schema.ins_columns where table_name = 'stb'")
+ tdSql.checkRows(14)
+ tdSql.checkData(0, 2, "SUPER_TABLE")
+
+
+ tdSql.query("select db_name,table_type,col_name,col_type,col_length from information_schema.ins_columns where table_name = 'ctb'")
+ tdSql.checkRows(14)
+ tdSql.checkData(0, 0, "db")
+ tdSql.checkData(1, 1, "CHILD_TABLE")
+ tdSql.checkData(3, 2, "c3")
+ tdSql.checkData(4, 3, "INT")
+ tdSql.checkData(5, 4, 8)
+
+ tdSql.query("desc information_schema.ins_columns")
+ tdSql.checkRows(9)
+ tdSql.checkData(0, 0, "table_name")
+ tdSql.checkData(5, 0, "col_length")
+ tdSql.checkData(1, 2, 64)
+
+ def check_get_db_name(self):
+ buildPath = tdCom.getBuildPath()
+ cmdStr = '%s/build/bin/get_db_name_test'%(buildPath)
+ tdLog.info(cmdStr)
+ ret = os.system(cmdStr)
+ if ret != 0:
+ tdLog.exit("sml_test get_db_name_test != 0")
+
+ def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring
+ tdSql.prepare(replica = self.replicaVar)
+
+ tdLog.printNoPrefix("==========start check_ins_cols run ...............")
+ self.check_ins_cols()
+ tdLog.printNoPrefix("==========end check_ins_cols run ...............")
+
+ tdLog.printNoPrefix("==========start check_get_db_name run ...............")
+ self.check_get_db_name()
+ tdLog.printNoPrefix("==========end check_get_db_name run ...............")
+
+ def stop(self):
+ tdSql.close()
+ tdLog.success(f"{__file__} successfully executed")
+
+
+tdCases.addLinux(__file__, TDTestCase())
+tdCases.addWindows(__file__, TDTestCase())
diff --git a/tests/system-test/7-tmq/tmqConsFromTsdb1-1ctb.py b/tests/system-test/7-tmq/tmqConsFromTsdb1-1ctb.py
index 009862137f8ccbb45f0f9bbfa134ae1a5b0479f1..4dcc0b963f8e883326132f5db3bc04a6edd349d1 100644
--- a/tests/system-test/7-tmq/tmqConsFromTsdb1-1ctb.py
+++ b/tests/system-test/7-tmq/tmqConsFromTsdb1-1ctb.py
@@ -116,7 +116,7 @@ class TDTestCase:
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
- keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
+ keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:200, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
consumerId = 4
@@ -188,7 +188,7 @@ class TDTestCase:
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
- keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
+ keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:200, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor 0")
diff --git a/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg.py b/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg.py
index 528b3a80887c3cc62b1092e5183249e962f5e8db..da8ac6c57deaf99d9871134489b290381b570306 100644
--- a/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg.py
+++ b/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg.py
@@ -116,7 +116,7 @@ class TDTestCase:
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
- keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
+ keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:200, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
consumerId = 4
@@ -188,7 +188,7 @@ class TDTestCase:
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
- keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
+ keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:200, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor 0")
diff --git a/utils/test/c/CMakeLists.txt b/utils/test/c/CMakeLists.txt
index b048b79e9b71058faaccdadeb723c1e0f46dce58..6ca266c55533b644c47ad61eb3bebc273ae7be7f 100644
--- a/utils/test/c/CMakeLists.txt
+++ b/utils/test/c/CMakeLists.txt
@@ -4,6 +4,7 @@ add_executable(tmq_sim tmqSim.c)
add_executable(create_table createTable.c)
add_executable(tmq_taosx_ci tmq_taosx_ci.c)
add_executable(sml_test sml_test.c)
+add_executable(get_db_name_test get_db_name_test.c)
target_link_libraries(
create_table
PUBLIC taos_static
@@ -40,3 +41,11 @@ target_link_libraries(
PUBLIC common
PUBLIC os
)
+
+target_link_libraries(
+ get_db_name_test
+ PUBLIC taos_static
+ PUBLIC util
+ PUBLIC common
+ PUBLIC os
+)
diff --git a/utils/test/c/get_db_name_test.c b/utils/test/c/get_db_name_test.c
new file mode 100644
index 0000000000000000000000000000000000000000..ebbfdc84a77a0eb3d76a5b561c9b280b3930c7a6
--- /dev/null
+++ b/utils/test/c/get_db_name_test.c
@@ -0,0 +1,65 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#include
+#include
+#include
+#include
+#include
+#include "taos.h"
+#include "types.h"
+#include "tlog.h"
+
+int get_db_test() {
+ TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
+
+ TAOS_RES *pRes = taos_query(taos, "create database if not exists sml_db vgroups 2");
+ taos_free_result(pRes);
+
+ pRes = taos_query(taos, "use sml_db");
+ int code = taos_errno(pRes);
+ taos_free_result(pRes);
+ ASSERT(code == 0);
+
+ code = taos_get_current_db(taos, NULL, 0, NULL);
+ ASSERT(code != 0);
+
+ int required = 0;
+ code = taos_get_current_db(taos, NULL, 0, &required);
+ ASSERT(code != 0);
+ ASSERT(required == 7);
+
+ char database[10] = {0};
+ code = taos_get_current_db(taos, database, 3, &required);
+ ASSERT(code != 0);
+ ASSERT(required == 7);
+ ASSERT(strcpy(database, "sm"));
+
+ char database1[10] = {0};
+ code = taos_get_current_db(taos, database1, 10, &required);
+ ASSERT(code == 0);
+ ASSERT(strcpy(database1, "sml_db"));
+
+ taos_close(taos);
+
+ return code;
+}
+
+int main(int argc, char *argv[]) {
+ int ret = 0;
+ ret = get_db_test();
+ ASSERT(!ret);
+ return ret;
+}