diff --git a/cmake/taosadapter_CMakeLists.txt.in b/cmake/taosadapter_CMakeLists.txt.in
index ab1609f35f15cd625cb2346ffe80ffad7330658e..13b247770ea7eef6b64209ca98787ff6d733bf85 100644
--- a/cmake/taosadapter_CMakeLists.txt.in
+++ b/cmake/taosadapter_CMakeLists.txt.in
@@ -2,7 +2,7 @@
# taosadapter
ExternalProject_Add(taosadapter
GIT_REPOSITORY https://github.com/taosdata/taosadapter.git
- GIT_TAG 69eee2e
+ GIT_TAG 213f8b3
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
diff --git a/docs/en/07-develop/07-tmq.mdx b/docs/en/07-develop/07-tmq.mdx
index 17b3f5caa062eaacb4216b7153e899040e702cc1..94a9dbffbd2bc5199edf6f2a6e4561355d967705 100644
--- a/docs/en/07-develop/07-tmq.mdx
+++ b/docs/en/07-develop/07-tmq.mdx
@@ -117,19 +117,22 @@ class TaosConsumer():
```go
-func NewConsumer(conf *Config) (*Consumer, error)
+func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)
-func (c *Consumer) Close() error
-
-func (c *Consumer) Commit(ctx context.Context, message unsafe.Pointer) error
+// rebalanceCb is reserved for compatibility purpose
+func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error
-func (c *Consumer) FreeMessage(message unsafe.Pointer)
+// rebalanceCb is reserved for compatibility purpose
+func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error
-func (c *Consumer) Poll(timeout time.Duration) (*Result, error)
+func (c *Consumer) Poll(timeoutMs int) tmq.Event
-func (c *Consumer) Subscribe(topics []string) error
+// tmq.TopicPartition is reserved for compatibility purpose
+func (c *Consumer) Commit() ([]tmq.TopicPartition, error)
func (c *Consumer) Unsubscribe() error
+
+func (c *Consumer) Close() error
```
@@ -357,50 +360,20 @@ public class MetersDeserializer extends ReferenceDeserializer {
```go
-config := tmq.NewConfig()
-defer config.Destroy()
-err = config.SetGroupID("test")
-if err != nil {
- panic(err)
-}
-err = config.SetAutoOffsetReset("earliest")
-if err != nil {
- panic(err)
-}
-err = config.SetConnectIP("127.0.0.1")
-if err != nil {
- panic(err)
-}
-err = config.SetConnectUser("root")
-if err != nil {
- panic(err)
-}
-err = config.SetConnectPass("taosdata")
-if err != nil {
- panic(err)
-}
-err = config.SetConnectPort("6030")
-if err != nil {
- panic(err)
-}
-err = config.SetMsgWithTableName(true)
-if err != nil {
- panic(err)
-}
-err = config.EnableHeartBeat()
-if err != nil {
- panic(err)
-}
-err = config.EnableAutoCommit(func(result *wrapper.TMQCommitCallbackResult) {
- if result.ErrCode != 0 {
- errStr := wrapper.TMQErr2Str(result.ErrCode)
- err := errors.NewError(int(result.ErrCode), errStr)
- panic(err)
- }
-})
-if err != nil {
- panic(err)
+conf := &tmq.ConfigMap{
+ "group.id": "test",
+ "auto.offset.reset": "earliest",
+ "td.connect.ip": "127.0.0.1",
+ "td.connect.user": "root",
+ "td.connect.pass": "taosdata",
+ "td.connect.port": "6030",
+ "client.id": "test_tmq_c",
+ "enable.auto.commit": "false",
+ "enable.heartbeat.background": "true",
+ "experimental.snapshot.enable": "true",
+ "msg.with.table.name": "true",
}
+consumer, err := NewConsumer(conf)
```
@@ -523,11 +496,7 @@ consumer.subscribe(topics);
```go
-consumer, err := tmq.NewConsumer(config)
-if err != nil {
- panic(err)
-}
-err = consumer.Subscribe([]string{"example_tmq_topic"})
+err = consumer.Subscribe("example_tmq_topic", nil)
if err != nil {
panic(err)
}
@@ -611,13 +580,17 @@ while(running){
```go
for {
- result, err := consumer.Poll(time.Second)
- if err != nil {
- panic(err)
+ ev := consumer.Poll(0)
+ if ev != nil {
+ switch e := ev.(type) {
+ case *tmqcommon.DataMessage:
+ fmt.Println(e.Value())
+ case tmqcommon.Error:
+ fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
+ panic(e)
+ }
+ consumer.Commit()
}
- fmt.Println(result)
- consumer.Commit(context.Background(), result.Message)
- consumer.FreeMessage(result.Message)
}
```
@@ -729,7 +702,11 @@ consumer.close();
```go
-consumer.Close()
+/* Unsubscribe */
+_ = consumer.Unsubscribe()
+
+/* Close consumer */
+_ = consumer.Close()
```
diff --git a/docs/en/14-reference/03-connector/05-go.mdx b/docs/en/14-reference/03-connector/05-go.mdx
index df5b129cea552144d5833190d46e8a78f2fd2fa5..60407c0735bf9bcb42ae54bddcc9afa639a02fcc 100644
--- a/docs/en/14-reference/03-connector/05-go.mdx
+++ b/docs/en/14-reference/03-connector/05-go.mdx
@@ -355,26 +355,29 @@ The `af` package encapsulates TDengine advanced functions such as connection man
#### Subscribe
-* `func NewConsumer(conf *Config) (*Consumer, error)`
+* `func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)`
Creates consumer group.
-* `func (c *Consumer) Subscribe(topics []string) error`
+* `func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error`
+Note: `rebalanceCb` is reserved for compatibility purpose
+
+Subscribes a topic.
+
+* `func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error`
+Note: `rebalanceCb` is reserved for compatibility purpose
Subscribes to topics.
-* `func (c *Consumer) Poll(timeout time.Duration) (*Result, error)`
+* `func (c *Consumer) Poll(timeoutMs int) tmq.Event`
Polling information.
-* `func (c *Consumer) Commit(ctx context.Context, message unsafe.Pointer) error`
+* `func (c *Consumer) Commit() ([]tmq.TopicPartition, error)`
+Note: `tmq.TopicPartition` is reserved for compatibility purpose
Commit information.
-* `func (c *Consumer) FreeMessage(message unsafe.Pointer)`
-
-Free information.
-
* `func (c *Consumer) Unsubscribe() error`
Unsubscribe.
@@ -441,25 +444,36 @@ Close consumer.
### Subscribe via WebSocket
-* `func NewConsumer(config *Config) (*Consumer, error)`
+* `func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)`
- Creates consumer group.
+Creates consumer group.
+
+* `func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error`
+Note: `rebalanceCb` is reserved for compatibility purpose
-* `func (c *Consumer) Subscribe(topic []string) error`
+Subscribes a topic.
- Subscribes to topics.
+* `func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error`
+Note: `rebalanceCb` is reserved for compatibility purpose
-* `func (c *Consumer) Poll(timeout time.Duration) (*Result, error)`
+Subscribes to topics.
- Polling information.
+* `func (c *Consumer) Poll(timeoutMs int) tmq.Event`
-* `func (c *Consumer) Commit(messageID uint64) error`
+Polling information.
- Commit information.
+* `func (c *Consumer) Commit() ([]tmq.TopicPartition, error)`
+Note: `tmq.TopicPartition` is reserved for compatibility purpose
+
+Commit information.
+
+* `func (c *Consumer) Unsubscribe() error`
+
+Unsubscribe.
* `func (c *Consumer) Close() error`
- Close consumer.
+Close consumer.
For a complete example see [GitHub sample file](https://github.com/taosdata/driver-go/blob/3.0/examples/tmqoverws/main.go)
diff --git a/docs/examples/go/go.mod b/docs/examples/go/go.mod
index 2bc1a74cb6ef14221fa384701773dc73fe3b161d..6696852312f39b93df189beba89f08c1945ef9f1 100644
--- a/docs/examples/go/go.mod
+++ b/docs/examples/go/go.mod
@@ -2,5 +2,5 @@ module goexample
go 1.17
-require github.com/taosdata/driver-go/v3 3.0
+require github.com/taosdata/driver-go/v3 3.1.0
diff --git a/docs/examples/go/sub/main.go b/docs/examples/go/sub/main.go
index a13d394a2c5009c1ad88684109b6f16b4d8a0540..1f7218936fbe457615562ded1b938daca95225cb 100644
--- a/docs/examples/go/sub/main.go
+++ b/docs/examples/go/sub/main.go
@@ -1,17 +1,12 @@
package main
import (
- "context"
- "encoding/json"
"fmt"
- "strconv"
- "time"
+ "os"
"github.com/taosdata/driver-go/v3/af"
"github.com/taosdata/driver-go/v3/af/tmq"
- "github.com/taosdata/driver-go/v3/common"
- "github.com/taosdata/driver-go/v3/errors"
- "github.com/taosdata/driver-go/v3/wrapper"
+ tmqcommon "github.com/taosdata/driver-go/v3/common/tmq"
)
func main() {
@@ -28,79 +23,56 @@ func main() {
if err != nil {
panic(err)
}
- config := tmq.NewConfig()
- defer config.Destroy()
- err = config.SetGroupID("test")
if err != nil {
panic(err)
}
- err = config.SetAutoOffsetReset("earliest")
- if err != nil {
- panic(err)
- }
- err = config.SetConnectIP("127.0.0.1")
- if err != nil {
- panic(err)
- }
- err = config.SetConnectUser("root")
- if err != nil {
- panic(err)
- }
- err = config.SetConnectPass("taosdata")
+ consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
+ "group.id": "test",
+ "auto.offset.reset": "earliest",
+ "td.connect.ip": "127.0.0.1",
+ "td.connect.user": "root",
+ "td.connect.pass": "taosdata",
+ "td.connect.port": "6030",
+ "client.id": "test_tmq_client",
+ "enable.auto.commit": "false",
+ "enable.heartbeat.background": "true",
+ "experimental.snapshot.enable": "true",
+ "msg.with.table.name": "true",
+ })
if err != nil {
panic(err)
}
- err = config.SetConnectPort("6030")
+ err = consumer.Subscribe("example_tmq_topic", nil)
if err != nil {
panic(err)
}
- err = config.SetMsgWithTableName(true)
+ _, err = db.Exec("create table example_tmq.t1 (ts timestamp,v int)")
if err != nil {
panic(err)
}
- err = config.EnableHeartBeat()
+ _, err = db.Exec("insert into example_tmq.t1 values(now,1)")
if err != nil {
panic(err)
}
- err = config.EnableAutoCommit(func(result *wrapper.TMQCommitCallbackResult) {
- if result.ErrCode != 0 {
- errStr := wrapper.TMQErr2Str(result.ErrCode)
- err := errors.NewError(int(result.ErrCode), errStr)
- panic(err)
+ for i := 0; i < 5; i++ {
+ ev := consumer.Poll(0)
+ if ev != nil {
+ switch e := ev.(type) {
+ case *tmqcommon.DataMessage:
+ fmt.Println(e.String())
+ case tmqcommon.Error:
+ fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
+ panic(e)
+ }
+ consumer.Commit()
}
- })
- if err != nil {
- panic(err)
}
- consumer, err := tmq.NewConsumer(config)
+ err = consumer.Unsubscribe()
if err != nil {
panic(err)
}
- err = consumer.Subscribe([]string{"example_tmq_topic"})
+ err = consumer.Close()
if err != nil {
panic(err)
}
- _, err = db.Exec("create table example_tmq.t1 (ts timestamp,v int)")
- if err != nil {
- panic(err)
- }
- _, err = db.Exec("insert into example_tmq.t1 values(now,1)")
- if err != nil {
- panic(err)
- }
- for {
- result, err := consumer.Poll(time.Second)
- if err != nil {
- panic(err)
- }
- if result.Type != common.TMQ_RES_DATA {
- panic("want message type 1 got " + strconv.Itoa(int(result.Type)))
- }
- data, _ := json.Marshal(result.Data)
- fmt.Println(string(data))
- consumer.Commit(context.Background(), result.Message)
- consumer.FreeMessage(result.Message)
- break
- }
- consumer.Close()
}
diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx
index 1f5a089aaa2a051e238eedc0315c37cad643b33f..039e9eb6356f55d2cadd83cd4c3ce5e3082c04d1 100644
--- a/docs/zh/07-develop/07-tmq.mdx
+++ b/docs/zh/07-develop/07-tmq.mdx
@@ -115,19 +115,22 @@ class TaosConsumer():
```go
-func NewConsumer(conf *Config) (*Consumer, error)
+func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)
-func (c *Consumer) Close() error
-
-func (c *Consumer) Commit(ctx context.Context, message unsafe.Pointer) error
+// 出于兼容目的保留 rebalanceCb 参数,当前未使用
+func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error
-func (c *Consumer) FreeMessage(message unsafe.Pointer)
+// 出于兼容目的保留 rebalanceCb 参数,当前未使用
+func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error
-func (c *Consumer) Poll(timeout time.Duration) (*Result, error)
+func (c *Consumer) Poll(timeoutMs int) tmq.Event
-func (c *Consumer) Subscribe(topics []string) error
+// 出于兼容目的保留 tmq.TopicPartition 参数,当前未使用
+func (c *Consumer) Commit() ([]tmq.TopicPartition, error)
func (c *Consumer) Unsubscribe() error
+
+func (c *Consumer) Close() error
```
@@ -355,50 +358,20 @@ public class MetersDeserializer extends ReferenceDeserializer {
```go
-config := tmq.NewConfig()
-defer config.Destroy()
-err = config.SetGroupID("test")
-if err != nil {
- panic(err)
-}
-err = config.SetAutoOffsetReset("earliest")
-if err != nil {
- panic(err)
-}
-err = config.SetConnectIP("127.0.0.1")
-if err != nil {
- panic(err)
-}
-err = config.SetConnectUser("root")
-if err != nil {
- panic(err)
-}
-err = config.SetConnectPass("taosdata")
-if err != nil {
- panic(err)
-}
-err = config.SetConnectPort("6030")
-if err != nil {
- panic(err)
-}
-err = config.SetMsgWithTableName(true)
-if err != nil {
- panic(err)
-}
-err = config.EnableHeartBeat()
-if err != nil {
- panic(err)
-}
-err = config.EnableAutoCommit(func(result *wrapper.TMQCommitCallbackResult) {
- if result.ErrCode != 0 {
- errStr := wrapper.TMQErr2Str(result.ErrCode)
- err := errors.NewError(int(result.ErrCode), errStr)
- panic(err)
- }
-})
-if err != nil {
- panic(err)
+conf := &tmq.ConfigMap{
+ "group.id": "test",
+ "auto.offset.reset": "earliest",
+ "td.connect.ip": "127.0.0.1",
+ "td.connect.user": "root",
+ "td.connect.pass": "taosdata",
+ "td.connect.port": "6030",
+ "client.id": "test_tmq_c",
+ "enable.auto.commit": "false",
+ "enable.heartbeat.background": "true",
+ "experimental.snapshot.enable": "true",
+ "msg.with.table.name": "true",
}
+consumer, err := NewConsumer(conf)
```
@@ -532,11 +505,7 @@ consumer.subscribe(topics);
```go
-consumer, err := tmq.NewConsumer(config)
-if err != nil {
- panic(err)
-}
-err = consumer.Subscribe([]string{"example_tmq_topic"})
+err = consumer.Subscribe("example_tmq_topic", nil)
if err != nil {
panic(err)
}
@@ -620,13 +589,17 @@ while(running){
```go
for {
- result, err := consumer.Poll(time.Second)
- if err != nil {
- panic(err)
+ ev := consumer.Poll(0)
+ if ev != nil {
+ switch e := ev.(type) {
+ case *tmqcommon.DataMessage:
+ fmt.Println(e.Value())
+ case tmqcommon.Error:
+ fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
+ panic(e)
+ }
+ consumer.Commit()
}
- fmt.Println(result)
- consumer.Commit(context.Background(), result.Message)
- consumer.FreeMessage(result.Message)
}
```
@@ -738,7 +711,11 @@ consumer.close();
```go
-consumer.Close()
+/* Unsubscribe */
+_ = consumer.Unsubscribe()
+
+/* Close consumer */
+_ = consumer.Close()
```
diff --git a/docs/zh/08-connector/20-go.mdx b/docs/zh/08-connector/20-go.mdx
index 0fc4007f6362697222b425c8c2c803b911b9ac8a..2aa1a58e49f34b412f12bd0d67586dc6e56cf0bc 100644
--- a/docs/zh/08-connector/20-go.mdx
+++ b/docs/zh/08-connector/20-go.mdx
@@ -15,7 +15,7 @@ import GoOpenTSDBTelnet from "../07-develop/03-insert-data/_go_opts_telnet.mdx"
import GoOpenTSDBJson from "../07-develop/03-insert-data/_go_opts_json.mdx"
import GoQuery from "../07-develop/04-query-data/_go.mdx"
-`driver-go` 是 TDengine 的官方 Go 语言连接器,实现了 Go 语言[ database/sql ](https://golang.org/pkg/database/sql/) 包的接口。Go 开发人员可以通过它开发存取 TDengine 集群数据的应用软件。
+`driver-go` 是 TDengine 的官方 Go 语言连接器,实现了 Go 语言 [database/sql](https://golang.org/pkg/database/sql/) 包的接口。Go 开发人员可以通过它开发存取 TDengine 集群数据的应用软件。
`driver-go` 提供两种建立连接的方式。一种是**原生连接**,它通过 TDengine 客户端驱动程序(taosc)原生连接 TDengine 运行实例,支持数据写入、查询、订阅、schemaless 接口和参数绑定接口等功能。另外一种是 **REST 连接**,它通过 taosAdapter 提供的 REST 接口连接 TDengine 运行实例。REST 连接实现的功能特性集合和原生连接有少量不同。
@@ -112,6 +112,7 @@ REST 连接支持所有能运行 Go 的平台。
```text
username:password@protocol(address)/dbname?param=value
```
+
### 使用连接器进行连接
@@ -176,6 +177,7 @@ func main() {
}
}
```
+
@@ -207,6 +209,7 @@ func main() {
}
}
```
+
@@ -357,33 +360,32 @@ func main() {
#### 订阅
-* `func NewConsumer(conf *Config) (*Consumer, error)`
-
-创建消费者。
+* `func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)`
-* `func (c *Consumer) Subscribe(topics []string) error`
+ 创建消费者。
-订阅主题。
+* `func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error`
+注意:出于兼容目的保留 `rebalanceCb` 参数,当前未使用
-* `func (c *Consumer) Poll(timeout time.Duration) (*Result, error)`
+ 订阅单个主题。
-轮询消息。
+* `func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error`
+注意:出于兼容目的保留 `rebalanceCb` 参数,当前未使用
-* `func (c *Consumer) Commit(ctx context.Context, message unsafe.Pointer) error`
+ 订阅主题。
-提交消息。
+* `func (c *Consumer) Poll(timeoutMs int) tmq.Event`
-* `func (c *Consumer) FreeMessage(message unsafe.Pointer)`
+ 轮询消息。
-释放消息。
+* `func (c *Consumer) Commit() ([]tmq.TopicPartition, error)`
+注意:出于兼容目的保留 `tmq.TopicPartition` 参数,当前未使用
-* `func (c *Consumer) Unsubscribe() error`
-
-取消订阅。
+ 提交消息。
* `func (c *Consumer) Close() error`
-关闭消费者。
+ 关闭连接。
#### schemaless
@@ -443,25 +445,32 @@ func main() {
### 通过 WebSocket 订阅
-* `func NewConsumer(config *Config) (*Consumer, error)`
+* `func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)`
+
+ 创建消费者。
+
+* `func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error`
+注意:出于兼容目的保留 `rebalanceCb` 参数,当前未使用
- 创建消费者。
+ 订阅单个主题。
-* `func (c *Consumer) Subscribe(topic []string) error`
+* `func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error`
+注意:出于兼容目的保留 `rebalanceCb` 参数,当前未使用
- 订阅主题。
+ 订阅主题。
-* `func (c *Consumer) Poll(timeout time.Duration) (*Result, error)`
+* `func (c *Consumer) Poll(timeoutMs int) tmq.Event`
- 轮询消息。
+ 轮询消息。
-* `func (c *Consumer) Commit(messageID uint64) error`
+* `func (c *Consumer) Commit() ([]tmq.TopicPartition, error)`
+注意:出于兼容目的保留 `tmq.TopicPartition` 参数,当前未使用
- 提交消息。
+ 提交消息。
* `func (c *Consumer) Close() error`
- 关闭消费者。
+ 关闭连接。
完整订阅示例参见 [GitHub 示例文件](https://github.com/taosdata/driver-go/blob/3.0/examples/tmqoverws/main.go)