From dba94bf5a02e28e3833cae5896e5495dcb558c1a Mon Sep 17 00:00:00 2001 From: Xuefeng Tan <1172915550@qq.com> Date: Sat, 14 Jan 2023 15:29:33 +0800 Subject: [PATCH] enh(driver-go): redesign of go tmq api (#19553) * enh(driver-go): redesign of go tmq api * docs: add comments for reserved parameter Co-authored-by: Shuduo Sang --- cmake/taosadapter_CMakeLists.txt.in | 2 +- docs/en/07-develop/07-tmq.mdx | 101 ++++++++------------ docs/en/14-reference/03-connector/05-go.mdx | 48 ++++++---- docs/examples/go/go.mod | 2 +- docs/examples/go/sub/main.go | 90 ++++++----------- docs/zh/07-develop/07-tmq.mdx | 101 ++++++++------------ docs/zh/08-connector/20-go.mdx | 59 +++++++----- 7 files changed, 176 insertions(+), 227 deletions(-) diff --git a/cmake/taosadapter_CMakeLists.txt.in b/cmake/taosadapter_CMakeLists.txt.in index ab1609f35f..13b247770e 100644 --- a/cmake/taosadapter_CMakeLists.txt.in +++ b/cmake/taosadapter_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taosadapter ExternalProject_Add(taosadapter GIT_REPOSITORY https://github.com/taosdata/taosadapter.git - GIT_TAG 69eee2e + GIT_TAG 213f8b3 SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter" BINARY_DIR "" #BUILD_IN_SOURCE TRUE diff --git a/docs/en/07-develop/07-tmq.mdx b/docs/en/07-develop/07-tmq.mdx index 17b3f5caa0..94a9dbffbd 100644 --- a/docs/en/07-develop/07-tmq.mdx +++ b/docs/en/07-develop/07-tmq.mdx @@ -117,19 +117,22 @@ class TaosConsumer(): ```go -func NewConsumer(conf *Config) (*Consumer, error) +func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error) -func (c *Consumer) Close() error - -func (c *Consumer) Commit(ctx context.Context, message unsafe.Pointer) error +// rebalanceCb is reserved for compatibility purpose +func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error -func (c *Consumer) FreeMessage(message unsafe.Pointer) +// rebalanceCb is reserved for compatibility purpose +func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error -func (c *Consumer) Poll(timeout time.Duration) (*Result, error) +func (c *Consumer) Poll(timeoutMs int) tmq.Event -func (c *Consumer) Subscribe(topics []string) error +// tmq.TopicPartition is reserved for compatibility purpose +func (c *Consumer) Commit() ([]tmq.TopicPartition, error) func (c *Consumer) Unsubscribe() error + +func (c *Consumer) Close() error ``` @@ -357,50 +360,20 @@ public class MetersDeserializer extends ReferenceDeserializer { ```go -config := tmq.NewConfig() -defer config.Destroy() -err = config.SetGroupID("test") -if err != nil { - panic(err) -} -err = config.SetAutoOffsetReset("earliest") -if err != nil { - panic(err) -} -err = config.SetConnectIP("127.0.0.1") -if err != nil { - panic(err) -} -err = config.SetConnectUser("root") -if err != nil { - panic(err) -} -err = config.SetConnectPass("taosdata") -if err != nil { - panic(err) -} -err = config.SetConnectPort("6030") -if err != nil { - panic(err) -} -err = config.SetMsgWithTableName(true) -if err != nil { - panic(err) -} -err = config.EnableHeartBeat() -if err != nil { - panic(err) -} -err = config.EnableAutoCommit(func(result *wrapper.TMQCommitCallbackResult) { - if result.ErrCode != 0 { - errStr := wrapper.TMQErr2Str(result.ErrCode) - err := errors.NewError(int(result.ErrCode), errStr) - panic(err) - } -}) -if err != nil { - panic(err) +conf := &tmq.ConfigMap{ + "group.id": "test", + "auto.offset.reset": "earliest", + "td.connect.ip": "127.0.0.1", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "td.connect.port": "6030", + "client.id": "test_tmq_c", + "enable.auto.commit": "false", + "enable.heartbeat.background": "true", + "experimental.snapshot.enable": "true", + "msg.with.table.name": "true", } +consumer, err := NewConsumer(conf) ``` @@ -523,11 +496,7 @@ consumer.subscribe(topics); ```go -consumer, err := tmq.NewConsumer(config) -if err != nil { - panic(err) -} -err = consumer.Subscribe([]string{"example_tmq_topic"}) +err = consumer.Subscribe("example_tmq_topic", nil) if err != nil { panic(err) } @@ -611,13 +580,17 @@ while(running){ ```go for { - result, err := consumer.Poll(time.Second) - if err != nil { - panic(err) + ev := consumer.Poll(0) + if ev != nil { + switch e := ev.(type) { + case *tmqcommon.DataMessage: + fmt.Println(e.Value()) + case tmqcommon.Error: + fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e) + panic(e) + } + consumer.Commit() } - fmt.Println(result) - consumer.Commit(context.Background(), result.Message) - consumer.FreeMessage(result.Message) } ``` @@ -729,7 +702,11 @@ consumer.close(); ```go -consumer.Close() +/* Unsubscribe */ +_ = consumer.Unsubscribe() + +/* Close consumer */ +_ = consumer.Close() ``` diff --git a/docs/en/14-reference/03-connector/05-go.mdx b/docs/en/14-reference/03-connector/05-go.mdx index df5b129cea..60407c0735 100644 --- a/docs/en/14-reference/03-connector/05-go.mdx +++ b/docs/en/14-reference/03-connector/05-go.mdx @@ -355,26 +355,29 @@ The `af` package encapsulates TDengine advanced functions such as connection man #### Subscribe -* `func NewConsumer(conf *Config) (*Consumer, error)` +* `func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)` Creates consumer group. -* `func (c *Consumer) Subscribe(topics []string) error` +* `func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error` +Note: `rebalanceCb` is reserved for compatibility purpose + +Subscribes a topic. + +* `func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error` +Note: `rebalanceCb` is reserved for compatibility purpose Subscribes to topics. -* `func (c *Consumer) Poll(timeout time.Duration) (*Result, error)` +* `func (c *Consumer) Poll(timeoutMs int) tmq.Event` Polling information. -* `func (c *Consumer) Commit(ctx context.Context, message unsafe.Pointer) error` +* `func (c *Consumer) Commit() ([]tmq.TopicPartition, error)` +Note: `tmq.TopicPartition` is reserved for compatibility purpose Commit information. -* `func (c *Consumer) FreeMessage(message unsafe.Pointer)` - -Free information. - * `func (c *Consumer) Unsubscribe() error` Unsubscribe. @@ -441,25 +444,36 @@ Close consumer. ### Subscribe via WebSocket -* `func NewConsumer(config *Config) (*Consumer, error)` +* `func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)` - Creates consumer group. +Creates consumer group. + +* `func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error` +Note: `rebalanceCb` is reserved for compatibility purpose -* `func (c *Consumer) Subscribe(topic []string) error` +Subscribes a topic. - Subscribes to topics. +* `func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error` +Note: `rebalanceCb` is reserved for compatibility purpose -* `func (c *Consumer) Poll(timeout time.Duration) (*Result, error)` +Subscribes to topics. - Polling information. +* `func (c *Consumer) Poll(timeoutMs int) tmq.Event` -* `func (c *Consumer) Commit(messageID uint64) error` +Polling information. - Commit information. +* `func (c *Consumer) Commit() ([]tmq.TopicPartition, error)` +Note: `tmq.TopicPartition` is reserved for compatibility purpose + +Commit information. + +* `func (c *Consumer) Unsubscribe() error` + +Unsubscribe. * `func (c *Consumer) Close() error` - Close consumer. +Close consumer. For a complete example see [GitHub sample file](https://github.com/taosdata/driver-go/blob/3.0/examples/tmqoverws/main.go) diff --git a/docs/examples/go/go.mod b/docs/examples/go/go.mod index 2bc1a74cb6..6696852312 100644 --- a/docs/examples/go/go.mod +++ b/docs/examples/go/go.mod @@ -2,5 +2,5 @@ module goexample go 1.17 -require github.com/taosdata/driver-go/v3 3.0 +require github.com/taosdata/driver-go/v3 3.1.0 diff --git a/docs/examples/go/sub/main.go b/docs/examples/go/sub/main.go index a13d394a2c..1f7218936f 100644 --- a/docs/examples/go/sub/main.go +++ b/docs/examples/go/sub/main.go @@ -1,17 +1,12 @@ package main import ( - "context" - "encoding/json" "fmt" - "strconv" - "time" + "os" "github.com/taosdata/driver-go/v3/af" "github.com/taosdata/driver-go/v3/af/tmq" - "github.com/taosdata/driver-go/v3/common" - "github.com/taosdata/driver-go/v3/errors" - "github.com/taosdata/driver-go/v3/wrapper" + tmqcommon "github.com/taosdata/driver-go/v3/common/tmq" ) func main() { @@ -28,79 +23,56 @@ func main() { if err != nil { panic(err) } - config := tmq.NewConfig() - defer config.Destroy() - err = config.SetGroupID("test") if err != nil { panic(err) } - err = config.SetAutoOffsetReset("earliest") - if err != nil { - panic(err) - } - err = config.SetConnectIP("127.0.0.1") - if err != nil { - panic(err) - } - err = config.SetConnectUser("root") - if err != nil { - panic(err) - } - err = config.SetConnectPass("taosdata") + consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{ + "group.id": "test", + "auto.offset.reset": "earliest", + "td.connect.ip": "127.0.0.1", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "td.connect.port": "6030", + "client.id": "test_tmq_client", + "enable.auto.commit": "false", + "enable.heartbeat.background": "true", + "experimental.snapshot.enable": "true", + "msg.with.table.name": "true", + }) if err != nil { panic(err) } - err = config.SetConnectPort("6030") + err = consumer.Subscribe("example_tmq_topic", nil) if err != nil { panic(err) } - err = config.SetMsgWithTableName(true) + _, err = db.Exec("create table example_tmq.t1 (ts timestamp,v int)") if err != nil { panic(err) } - err = config.EnableHeartBeat() + _, err = db.Exec("insert into example_tmq.t1 values(now,1)") if err != nil { panic(err) } - err = config.EnableAutoCommit(func(result *wrapper.TMQCommitCallbackResult) { - if result.ErrCode != 0 { - errStr := wrapper.TMQErr2Str(result.ErrCode) - err := errors.NewError(int(result.ErrCode), errStr) - panic(err) + for i := 0; i < 5; i++ { + ev := consumer.Poll(0) + if ev != nil { + switch e := ev.(type) { + case *tmqcommon.DataMessage: + fmt.Println(e.String()) + case tmqcommon.Error: + fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e) + panic(e) + } + consumer.Commit() } - }) - if err != nil { - panic(err) } - consumer, err := tmq.NewConsumer(config) + err = consumer.Unsubscribe() if err != nil { panic(err) } - err = consumer.Subscribe([]string{"example_tmq_topic"}) + err = consumer.Close() if err != nil { panic(err) } - _, err = db.Exec("create table example_tmq.t1 (ts timestamp,v int)") - if err != nil { - panic(err) - } - _, err = db.Exec("insert into example_tmq.t1 values(now,1)") - if err != nil { - panic(err) - } - for { - result, err := consumer.Poll(time.Second) - if err != nil { - panic(err) - } - if result.Type != common.TMQ_RES_DATA { - panic("want message type 1 got " + strconv.Itoa(int(result.Type))) - } - data, _ := json.Marshal(result.Data) - fmt.Println(string(data)) - consumer.Commit(context.Background(), result.Message) - consumer.FreeMessage(result.Message) - break - } - consumer.Close() } diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx index 1f5a089aaa..039e9eb635 100644 --- a/docs/zh/07-develop/07-tmq.mdx +++ b/docs/zh/07-develop/07-tmq.mdx @@ -115,19 +115,22 @@ class TaosConsumer(): ```go -func NewConsumer(conf *Config) (*Consumer, error) +func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error) -func (c *Consumer) Close() error - -func (c *Consumer) Commit(ctx context.Context, message unsafe.Pointer) error +// 出于兼容目的保留 rebalanceCb 参数,当前未使用 +func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error -func (c *Consumer) FreeMessage(message unsafe.Pointer) +// 出于兼容目的保留 rebalanceCb 参数,当前未使用 +func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error -func (c *Consumer) Poll(timeout time.Duration) (*Result, error) +func (c *Consumer) Poll(timeoutMs int) tmq.Event -func (c *Consumer) Subscribe(topics []string) error +// 出于兼容目的保留 tmq.TopicPartition 参数,当前未使用 +func (c *Consumer) Commit() ([]tmq.TopicPartition, error) func (c *Consumer) Unsubscribe() error + +func (c *Consumer) Close() error ``` @@ -355,50 +358,20 @@ public class MetersDeserializer extends ReferenceDeserializer { ```go -config := tmq.NewConfig() -defer config.Destroy() -err = config.SetGroupID("test") -if err != nil { - panic(err) -} -err = config.SetAutoOffsetReset("earliest") -if err != nil { - panic(err) -} -err = config.SetConnectIP("127.0.0.1") -if err != nil { - panic(err) -} -err = config.SetConnectUser("root") -if err != nil { - panic(err) -} -err = config.SetConnectPass("taosdata") -if err != nil { - panic(err) -} -err = config.SetConnectPort("6030") -if err != nil { - panic(err) -} -err = config.SetMsgWithTableName(true) -if err != nil { - panic(err) -} -err = config.EnableHeartBeat() -if err != nil { - panic(err) -} -err = config.EnableAutoCommit(func(result *wrapper.TMQCommitCallbackResult) { - if result.ErrCode != 0 { - errStr := wrapper.TMQErr2Str(result.ErrCode) - err := errors.NewError(int(result.ErrCode), errStr) - panic(err) - } -}) -if err != nil { - panic(err) +conf := &tmq.ConfigMap{ + "group.id": "test", + "auto.offset.reset": "earliest", + "td.connect.ip": "127.0.0.1", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "td.connect.port": "6030", + "client.id": "test_tmq_c", + "enable.auto.commit": "false", + "enable.heartbeat.background": "true", + "experimental.snapshot.enable": "true", + "msg.with.table.name": "true", } +consumer, err := NewConsumer(conf) ``` @@ -532,11 +505,7 @@ consumer.subscribe(topics); ```go -consumer, err := tmq.NewConsumer(config) -if err != nil { - panic(err) -} -err = consumer.Subscribe([]string{"example_tmq_topic"}) +err = consumer.Subscribe("example_tmq_topic", nil) if err != nil { panic(err) } @@ -620,13 +589,17 @@ while(running){ ```go for { - result, err := consumer.Poll(time.Second) - if err != nil { - panic(err) + ev := consumer.Poll(0) + if ev != nil { + switch e := ev.(type) { + case *tmqcommon.DataMessage: + fmt.Println(e.Value()) + case tmqcommon.Error: + fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e) + panic(e) + } + consumer.Commit() } - fmt.Println(result) - consumer.Commit(context.Background(), result.Message) - consumer.FreeMessage(result.Message) } ``` @@ -738,7 +711,11 @@ consumer.close(); ```go -consumer.Close() +/* Unsubscribe */ +_ = consumer.Unsubscribe() + +/* Close consumer */ +_ = consumer.Close() ``` diff --git a/docs/zh/08-connector/20-go.mdx b/docs/zh/08-connector/20-go.mdx index 0fc4007f63..2aa1a58e49 100644 --- a/docs/zh/08-connector/20-go.mdx +++ b/docs/zh/08-connector/20-go.mdx @@ -15,7 +15,7 @@ import GoOpenTSDBTelnet from "../07-develop/03-insert-data/_go_opts_telnet.mdx" import GoOpenTSDBJson from "../07-develop/03-insert-data/_go_opts_json.mdx" import GoQuery from "../07-develop/04-query-data/_go.mdx" -`driver-go` 是 TDengine 的官方 Go 语言连接器,实现了 Go 语言[ database/sql ](https://golang.org/pkg/database/sql/) 包的接口。Go 开发人员可以通过它开发存取 TDengine 集群数据的应用软件。 +`driver-go` 是 TDengine 的官方 Go 语言连接器,实现了 Go 语言 [database/sql](https://golang.org/pkg/database/sql/) 包的接口。Go 开发人员可以通过它开发存取 TDengine 集群数据的应用软件。 `driver-go` 提供两种建立连接的方式。一种是**原生连接**,它通过 TDengine 客户端驱动程序(taosc)原生连接 TDengine 运行实例,支持数据写入、查询、订阅、schemaless 接口和参数绑定接口等功能。另外一种是 **REST 连接**,它通过 taosAdapter 提供的 REST 接口连接 TDengine 运行实例。REST 连接实现的功能特性集合和原生连接有少量不同。 @@ -112,6 +112,7 @@ REST 连接支持所有能运行 Go 的平台。 ```text username:password@protocol(address)/dbname?param=value ``` + ### 使用连接器进行连接 @@ -176,6 +177,7 @@ func main() { } } ``` + @@ -207,6 +209,7 @@ func main() { } } ``` + @@ -357,33 +360,32 @@ func main() { #### 订阅 -* `func NewConsumer(conf *Config) (*Consumer, error)` - -创建消费者。 +* `func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)` -* `func (c *Consumer) Subscribe(topics []string) error` + 创建消费者。 -订阅主题。 +* `func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error` +注意:出于兼容目的保留 `rebalanceCb` 参数,当前未使用 -* `func (c *Consumer) Poll(timeout time.Duration) (*Result, error)` + 订阅单个主题。 -轮询消息。 +* `func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error` +注意:出于兼容目的保留 `rebalanceCb` 参数,当前未使用 -* `func (c *Consumer) Commit(ctx context.Context, message unsafe.Pointer) error` + 订阅主题。 -提交消息。 +* `func (c *Consumer) Poll(timeoutMs int) tmq.Event` -* `func (c *Consumer) FreeMessage(message unsafe.Pointer)` + 轮询消息。 -释放消息。 +* `func (c *Consumer) Commit() ([]tmq.TopicPartition, error)` +注意:出于兼容目的保留 `tmq.TopicPartition` 参数,当前未使用 -* `func (c *Consumer) Unsubscribe() error` - -取消订阅。 + 提交消息。 * `func (c *Consumer) Close() error` -关闭消费者。 + 关闭连接。 #### schemaless @@ -443,25 +445,32 @@ func main() { ### 通过 WebSocket 订阅 -* `func NewConsumer(config *Config) (*Consumer, error)` +* `func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)` + + 创建消费者。 + +* `func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error` +注意:出于兼容目的保留 `rebalanceCb` 参数,当前未使用 - 创建消费者。 + 订阅单个主题。 -* `func (c *Consumer) Subscribe(topic []string) error` +* `func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error` +注意:出于兼容目的保留 `rebalanceCb` 参数,当前未使用 - 订阅主题。 + 订阅主题。 -* `func (c *Consumer) Poll(timeout time.Duration) (*Result, error)` +* `func (c *Consumer) Poll(timeoutMs int) tmq.Event` - 轮询消息。 + 轮询消息。 -* `func (c *Consumer) Commit(messageID uint64) error` +* `func (c *Consumer) Commit() ([]tmq.TopicPartition, error)` +注意:出于兼容目的保留 `tmq.TopicPartition` 参数,当前未使用 - 提交消息。 + 提交消息。 * `func (c *Consumer) Close() error` - 关闭消费者。 + 关闭连接。 完整订阅示例参见 [GitHub 示例文件](https://github.com/taosdata/driver-go/blob/3.0/examples/tmqoverws/main.go) -- GitLab