未验证 提交 dba94bf5 编写于 作者: X Xuefeng Tan 提交者: GitHub

enh(driver-go): redesign of go tmq api (#19553)

* enh(driver-go): redesign of go tmq api

* docs: add comments for reserved parameter
Co-authored-by: sangshuduo's avatarShuduo Sang <sangshuduo@gmail.com>
上级 1094c376
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
# taosadapter # taosadapter
ExternalProject_Add(taosadapter ExternalProject_Add(taosadapter
GIT_REPOSITORY https://github.com/taosdata/taosadapter.git GIT_REPOSITORY https://github.com/taosdata/taosadapter.git
GIT_TAG 69eee2e GIT_TAG 213f8b3
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter" SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter"
BINARY_DIR "" BINARY_DIR ""
#BUILD_IN_SOURCE TRUE #BUILD_IN_SOURCE TRUE
......
...@@ -117,19 +117,22 @@ class TaosConsumer(): ...@@ -117,19 +117,22 @@ class TaosConsumer():
<TabItem label="Go" value="Go"> <TabItem label="Go" value="Go">
```go ```go
func NewConsumer(conf *Config) (*Consumer, error) func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)
func (c *Consumer) Close() error // rebalanceCb is reserved for compatibility purpose
func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error
func (c *Consumer) Commit(ctx context.Context, message unsafe.Pointer) error
func (c *Consumer) FreeMessage(message unsafe.Pointer) // rebalanceCb is reserved for compatibility purpose
func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error
func (c *Consumer) Poll(timeout time.Duration) (*Result, error) func (c *Consumer) Poll(timeoutMs int) tmq.Event
func (c *Consumer) Subscribe(topics []string) error // tmq.TopicPartition is reserved for compatibility purpose
func (c *Consumer) Commit() ([]tmq.TopicPartition, error)
func (c *Consumer) Unsubscribe() error func (c *Consumer) Unsubscribe() error
func (c *Consumer) Close() error
``` ```
</TabItem> </TabItem>
...@@ -357,50 +360,20 @@ public class MetersDeserializer extends ReferenceDeserializer<Meters> { ...@@ -357,50 +360,20 @@ public class MetersDeserializer extends ReferenceDeserializer<Meters> {
<TabItem label="Go" value="Go"> <TabItem label="Go" value="Go">
```go ```go
config := tmq.NewConfig() conf := &tmq.ConfigMap{
defer config.Destroy() "group.id": "test",
err = config.SetGroupID("test") "auto.offset.reset": "earliest",
if err != nil { "td.connect.ip": "127.0.0.1",
panic(err) "td.connect.user": "root",
} "td.connect.pass": "taosdata",
err = config.SetAutoOffsetReset("earliest") "td.connect.port": "6030",
if err != nil { "client.id": "test_tmq_c",
panic(err) "enable.auto.commit": "false",
} "enable.heartbeat.background": "true",
err = config.SetConnectIP("127.0.0.1") "experimental.snapshot.enable": "true",
if err != nil { "msg.with.table.name": "true",
panic(err)
}
err = config.SetConnectUser("root")
if err != nil {
panic(err)
}
err = config.SetConnectPass("taosdata")
if err != nil {
panic(err)
}
err = config.SetConnectPort("6030")
if err != nil {
panic(err)
}
err = config.SetMsgWithTableName(true)
if err != nil {
panic(err)
}
err = config.EnableHeartBeat()
if err != nil {
panic(err)
}
err = config.EnableAutoCommit(func(result *wrapper.TMQCommitCallbackResult) {
if result.ErrCode != 0 {
errStr := wrapper.TMQErr2Str(result.ErrCode)
err := errors.NewError(int(result.ErrCode), errStr)
panic(err)
}
})
if err != nil {
panic(err)
} }
consumer, err := NewConsumer(conf)
``` ```
</TabItem> </TabItem>
...@@ -523,11 +496,7 @@ consumer.subscribe(topics); ...@@ -523,11 +496,7 @@ consumer.subscribe(topics);
<TabItem value="Go" label="Go"> <TabItem value="Go" label="Go">
```go ```go
consumer, err := tmq.NewConsumer(config) err = consumer.Subscribe("example_tmq_topic", nil)
if err != nil {
panic(err)
}
err = consumer.Subscribe([]string{"example_tmq_topic"})
if err != nil { if err != nil {
panic(err) panic(err)
} }
...@@ -611,13 +580,17 @@ while(running){ ...@@ -611,13 +580,17 @@ while(running){
```go ```go
for { for {
result, err := consumer.Poll(time.Second) ev := consumer.Poll(0)
if err != nil { if ev != nil {
panic(err) switch e := ev.(type) {
case *tmqcommon.DataMessage:
fmt.Println(e.Value())
case tmqcommon.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
panic(e)
}
consumer.Commit()
} }
fmt.Println(result)
consumer.Commit(context.Background(), result.Message)
consumer.FreeMessage(result.Message)
} }
``` ```
...@@ -729,7 +702,11 @@ consumer.close(); ...@@ -729,7 +702,11 @@ consumer.close();
<TabItem value="Go" label="Go"> <TabItem value="Go" label="Go">
```go ```go
consumer.Close() /* Unsubscribe */
_ = consumer.Unsubscribe()
/* Close consumer */
_ = consumer.Close()
``` ```
</TabItem> </TabItem>
......
...@@ -355,26 +355,29 @@ The `af` package encapsulates TDengine advanced functions such as connection man ...@@ -355,26 +355,29 @@ The `af` package encapsulates TDengine advanced functions such as connection man
#### Subscribe #### Subscribe
* `func NewConsumer(conf *Config) (*Consumer, error)` * `func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)`
Creates consumer group. Creates consumer group.
* `func (c *Consumer) Subscribe(topics []string) error` * `func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error`
Note: `rebalanceCb` is reserved for compatibility purpose
Subscribes a topic.
* `func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error`
Note: `rebalanceCb` is reserved for compatibility purpose
Subscribes to topics. Subscribes to topics.
* `func (c *Consumer) Poll(timeout time.Duration) (*Result, error)` * `func (c *Consumer) Poll(timeoutMs int) tmq.Event`
Polling information. Polling information.
* `func (c *Consumer) Commit(ctx context.Context, message unsafe.Pointer) error` * `func (c *Consumer) Commit() ([]tmq.TopicPartition, error)`
Note: `tmq.TopicPartition` is reserved for compatibility purpose
Commit information. Commit information.
* `func (c *Consumer) FreeMessage(message unsafe.Pointer)`
Free information.
* `func (c *Consumer) Unsubscribe() error` * `func (c *Consumer) Unsubscribe() error`
Unsubscribe. Unsubscribe.
...@@ -441,25 +444,36 @@ Close consumer. ...@@ -441,25 +444,36 @@ Close consumer.
### Subscribe via WebSocket ### Subscribe via WebSocket
* `func NewConsumer(config *Config) (*Consumer, error)` * `func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)`
Creates consumer group. Creates consumer group.
* `func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error`
Note: `rebalanceCb` is reserved for compatibility purpose
* `func (c *Consumer) Subscribe(topic []string) error` Subscribes a topic.
Subscribes to topics. * `func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error`
Note: `rebalanceCb` is reserved for compatibility purpose
* `func (c *Consumer) Poll(timeout time.Duration) (*Result, error)` Subscribes to topics.
Polling information. * `func (c *Consumer) Poll(timeoutMs int) tmq.Event`
* `func (c *Consumer) Commit(messageID uint64) error` Polling information.
Commit information. * `func (c *Consumer) Commit() ([]tmq.TopicPartition, error)`
Note: `tmq.TopicPartition` is reserved for compatibility purpose
Commit information.
* `func (c *Consumer) Unsubscribe() error`
Unsubscribe.
* `func (c *Consumer) Close() error` * `func (c *Consumer) Close() error`
Close consumer. Close consumer.
For a complete example see [GitHub sample file](https://github.com/taosdata/driver-go/blob/3.0/examples/tmqoverws/main.go) For a complete example see [GitHub sample file](https://github.com/taosdata/driver-go/blob/3.0/examples/tmqoverws/main.go)
......
...@@ -2,5 +2,5 @@ module goexample ...@@ -2,5 +2,5 @@ module goexample
go 1.17 go 1.17
require github.com/taosdata/driver-go/v3 3.0 require github.com/taosdata/driver-go/v3 3.1.0
package main package main
import ( import (
"context"
"encoding/json"
"fmt" "fmt"
"strconv" "os"
"time"
"github.com/taosdata/driver-go/v3/af" "github.com/taosdata/driver-go/v3/af"
"github.com/taosdata/driver-go/v3/af/tmq" "github.com/taosdata/driver-go/v3/af/tmq"
"github.com/taosdata/driver-go/v3/common" tmqcommon "github.com/taosdata/driver-go/v3/common/tmq"
"github.com/taosdata/driver-go/v3/errors"
"github.com/taosdata/driver-go/v3/wrapper"
) )
func main() { func main() {
...@@ -28,79 +23,56 @@ func main() { ...@@ -28,79 +23,56 @@ func main() {
if err != nil { if err != nil {
panic(err) panic(err)
} }
config := tmq.NewConfig()
defer config.Destroy()
err = config.SetGroupID("test")
if err != nil { if err != nil {
panic(err) panic(err)
} }
err = config.SetAutoOffsetReset("earliest") consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
if err != nil { "group.id": "test",
panic(err) "auto.offset.reset": "earliest",
} "td.connect.ip": "127.0.0.1",
err = config.SetConnectIP("127.0.0.1") "td.connect.user": "root",
if err != nil { "td.connect.pass": "taosdata",
panic(err) "td.connect.port": "6030",
} "client.id": "test_tmq_client",
err = config.SetConnectUser("root") "enable.auto.commit": "false",
if err != nil { "enable.heartbeat.background": "true",
panic(err) "experimental.snapshot.enable": "true",
} "msg.with.table.name": "true",
err = config.SetConnectPass("taosdata") })
if err != nil {
panic(err)
}
err = config.SetConnectPort("6030")
if err != nil { if err != nil {
panic(err) panic(err)
} }
err = config.SetMsgWithTableName(true) err = consumer.Subscribe("example_tmq_topic", nil)
if err != nil { if err != nil {
panic(err) panic(err)
} }
err = config.EnableHeartBeat() _, err = db.Exec("create table example_tmq.t1 (ts timestamp,v int)")
if err != nil { if err != nil {
panic(err) panic(err)
} }
err = config.EnableAutoCommit(func(result *wrapper.TMQCommitCallbackResult) { _, err = db.Exec("insert into example_tmq.t1 values(now,1)")
if result.ErrCode != 0 {
errStr := wrapper.TMQErr2Str(result.ErrCode)
err := errors.NewError(int(result.ErrCode), errStr)
panic(err)
}
})
if err != nil { if err != nil {
panic(err) panic(err)
} }
consumer, err := tmq.NewConsumer(config) for i := 0; i < 5; i++ {
if err != nil { ev := consumer.Poll(0)
panic(err) if ev != nil {
switch e := ev.(type) {
case *tmqcommon.DataMessage:
fmt.Println(e.String())
case tmqcommon.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
panic(e)
} }
err = consumer.Subscribe([]string{"example_tmq_topic"}) consumer.Commit()
if err != nil {
panic(err)
} }
_, err = db.Exec("create table example_tmq.t1 (ts timestamp,v int)")
if err != nil {
panic(err)
} }
_, err = db.Exec("insert into example_tmq.t1 values(now,1)") err = consumer.Unsubscribe()
if err != nil { if err != nil {
panic(err) panic(err)
} }
for { err = consumer.Close()
result, err := consumer.Poll(time.Second)
if err != nil { if err != nil {
panic(err) panic(err)
} }
if result.Type != common.TMQ_RES_DATA {
panic("want message type 1 got " + strconv.Itoa(int(result.Type)))
}
data, _ := json.Marshal(result.Data)
fmt.Println(string(data))
consumer.Commit(context.Background(), result.Message)
consumer.FreeMessage(result.Message)
break
}
consumer.Close()
} }
...@@ -115,19 +115,22 @@ class TaosConsumer(): ...@@ -115,19 +115,22 @@ class TaosConsumer():
<TabItem label="Go" value="Go"> <TabItem label="Go" value="Go">
```go ```go
func NewConsumer(conf *Config) (*Consumer, error) func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)
func (c *Consumer) Close() error // 出于兼容目的保留 rebalanceCb 参数,当前未使用
func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error
func (c *Consumer) Commit(ctx context.Context, message unsafe.Pointer) error
func (c *Consumer) FreeMessage(message unsafe.Pointer) // 出于兼容目的保留 rebalanceCb 参数,当前未使用
func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error
func (c *Consumer) Poll(timeout time.Duration) (*Result, error) func (c *Consumer) Poll(timeoutMs int) tmq.Event
func (c *Consumer) Subscribe(topics []string) error // 出于兼容目的保留 tmq.TopicPartition 参数,当前未使用
func (c *Consumer) Commit() ([]tmq.TopicPartition, error)
func (c *Consumer) Unsubscribe() error func (c *Consumer) Unsubscribe() error
func (c *Consumer) Close() error
``` ```
</TabItem> </TabItem>
...@@ -355,50 +358,20 @@ public class MetersDeserializer extends ReferenceDeserializer<Meters> { ...@@ -355,50 +358,20 @@ public class MetersDeserializer extends ReferenceDeserializer<Meters> {
<TabItem label="Go" value="Go"> <TabItem label="Go" value="Go">
```go ```go
config := tmq.NewConfig() conf := &tmq.ConfigMap{
defer config.Destroy() "group.id": "test",
err = config.SetGroupID("test") "auto.offset.reset": "earliest",
if err != nil { "td.connect.ip": "127.0.0.1",
panic(err) "td.connect.user": "root",
} "td.connect.pass": "taosdata",
err = config.SetAutoOffsetReset("earliest") "td.connect.port": "6030",
if err != nil { "client.id": "test_tmq_c",
panic(err) "enable.auto.commit": "false",
} "enable.heartbeat.background": "true",
err = config.SetConnectIP("127.0.0.1") "experimental.snapshot.enable": "true",
if err != nil { "msg.with.table.name": "true",
panic(err)
}
err = config.SetConnectUser("root")
if err != nil {
panic(err)
}
err = config.SetConnectPass("taosdata")
if err != nil {
panic(err)
}
err = config.SetConnectPort("6030")
if err != nil {
panic(err)
}
err = config.SetMsgWithTableName(true)
if err != nil {
panic(err)
}
err = config.EnableHeartBeat()
if err != nil {
panic(err)
}
err = config.EnableAutoCommit(func(result *wrapper.TMQCommitCallbackResult) {
if result.ErrCode != 0 {
errStr := wrapper.TMQErr2Str(result.ErrCode)
err := errors.NewError(int(result.ErrCode), errStr)
panic(err)
}
})
if err != nil {
panic(err)
} }
consumer, err := NewConsumer(conf)
``` ```
</TabItem> </TabItem>
...@@ -532,11 +505,7 @@ consumer.subscribe(topics); ...@@ -532,11 +505,7 @@ consumer.subscribe(topics);
<TabItem value="Go" label="Go"> <TabItem value="Go" label="Go">
```go ```go
consumer, err := tmq.NewConsumer(config) err = consumer.Subscribe("example_tmq_topic", nil)
if err != nil {
panic(err)
}
err = consumer.Subscribe([]string{"example_tmq_topic"})
if err != nil { if err != nil {
panic(err) panic(err)
} }
...@@ -620,13 +589,17 @@ while(running){ ...@@ -620,13 +589,17 @@ while(running){
```go ```go
for { for {
result, err := consumer.Poll(time.Second) ev := consumer.Poll(0)
if err != nil { if ev != nil {
panic(err) switch e := ev.(type) {
case *tmqcommon.DataMessage:
fmt.Println(e.Value())
case tmqcommon.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
panic(e)
}
consumer.Commit()
} }
fmt.Println(result)
consumer.Commit(context.Background(), result.Message)
consumer.FreeMessage(result.Message)
} }
``` ```
...@@ -738,7 +711,11 @@ consumer.close(); ...@@ -738,7 +711,11 @@ consumer.close();
<TabItem value="Go" label="Go"> <TabItem value="Go" label="Go">
```go ```go
consumer.Close() /* Unsubscribe */
_ = consumer.Unsubscribe()
/* Close consumer */
_ = consumer.Close()
``` ```
</TabItem> </TabItem>
......
...@@ -15,7 +15,7 @@ import GoOpenTSDBTelnet from "../07-develop/03-insert-data/_go_opts_telnet.mdx" ...@@ -15,7 +15,7 @@ import GoOpenTSDBTelnet from "../07-develop/03-insert-data/_go_opts_telnet.mdx"
import GoOpenTSDBJson from "../07-develop/03-insert-data/_go_opts_json.mdx" import GoOpenTSDBJson from "../07-develop/03-insert-data/_go_opts_json.mdx"
import GoQuery from "../07-develop/04-query-data/_go.mdx" import GoQuery from "../07-develop/04-query-data/_go.mdx"
`driver-go` TDengine 的官方 Go 语言连接器,实现了 Go 语言[ database/sql ](https://golang.org/pkg/database/sql/) 包的接口。Go 开发人员可以通过它开发存取 TDengine 集群数据的应用软件。 `driver-go` TDengine 的官方 Go 语言连接器,实现了 Go 语言 [database/sql](https://golang.org/pkg/database/sql/) 包的接口。Go 开发人员可以通过它开发存取 TDengine 集群数据的应用软件。
`driver-go` 提供两种建立连接的方式。一种是**原生连接**,它通过 TDengine 客户端驱动程序(taosc)原生连接 TDengine 运行实例,支持数据写入、查询、订阅、schemaless 接口和参数绑定接口等功能。另外一种是 **REST 连接**,它通过 taosAdapter 提供的 REST 接口连接 TDengine 运行实例。REST 连接实现的功能特性集合和原生连接有少量不同。 `driver-go` 提供两种建立连接的方式。一种是**原生连接**,它通过 TDengine 客户端驱动程序(taosc)原生连接 TDengine 运行实例,支持数据写入、查询、订阅、schemaless 接口和参数绑定接口等功能。另外一种是 **REST 连接**,它通过 taosAdapter 提供的 REST 接口连接 TDengine 运行实例。REST 连接实现的功能特性集合和原生连接有少量不同。
...@@ -112,6 +112,7 @@ REST 连接支持所有能运行 Go 的平台。 ...@@ -112,6 +112,7 @@ REST 连接支持所有能运行 Go 的平台。
```text ```text
username:password@protocol(address)/dbname?param=value username:password@protocol(address)/dbname?param=value
``` ```
### 使用连接器进行连接 ### 使用连接器进行连接
<Tabs defaultValue="rest"> <Tabs defaultValue="rest">
...@@ -176,6 +177,7 @@ func main() { ...@@ -176,6 +177,7 @@ func main() {
} }
} }
``` ```
</TabItem> </TabItem>
<TabItem value="WebSocket" label="WebSocket 连接"> <TabItem value="WebSocket" label="WebSocket 连接">
...@@ -207,6 +209,7 @@ func main() { ...@@ -207,6 +209,7 @@ func main() {
} }
} }
``` ```
</TabItem> </TabItem>
</Tabs> </Tabs>
...@@ -357,33 +360,32 @@ func main() { ...@@ -357,33 +360,32 @@ func main() {
#### 订阅 #### 订阅
* `func NewConsumer(conf *Config) (*Consumer, error)` * `func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)`
创建消费者。
* `func (c *Consumer) Subscribe(topics []string) error` 创建消费者。
订阅主题。
* `func (c *Consumer) Poll(timeout time.Duration) (*Result, error)` * `func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error`
注意:出于兼容目的保留 `rebalanceCb` 参数,当前未使用
轮询消息 订阅单个主题
* `func (c *Consumer) Commit(ctx context.Context, message unsafe.Pointer) error` * `func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error`
注意:出于兼容目的保留 `rebalanceCb` 参数,当前未使用
提交消息 订阅主题
* `func (c *Consumer) FreeMessage(message unsafe.Pointer)` * `func (c *Consumer) Poll(timeoutMs int) tmq.Event`
释放消息。 轮询消息。
* `func (c *Consumer) Unsubscribe() error` * `func (c *Consumer) Commit() ([]tmq.TopicPartition, error)`
注意:出于兼容目的保留 `tmq.TopicPartition` 参数,当前未使用
取消订阅 提交消息
* `func (c *Consumer) Close() error` * `func (c *Consumer) Close() error`
关闭消费者 关闭连接
#### schemaless #### schemaless
...@@ -443,25 +445,32 @@ func main() { ...@@ -443,25 +445,32 @@ func main() {
### 通过 WebSocket 订阅 ### 通过 WebSocket 订阅
* `func NewConsumer(config *Config) (*Consumer, error)` * `func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)`
创建消费者。 创建消费者。
* `func (c *Consumer) Subscribe(topic []string) error` * `func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error`
注意:出于兼容目的保留 `rebalanceCb` 参数,当前未使用
订阅单个主题。
* `func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error`
注意:出于兼容目的保留 `rebalanceCb` 参数,当前未使用
订阅主题。 订阅主题。
* `func (c *Consumer) Poll(timeout time.Duration) (*Result, error)` * `func (c *Consumer) Poll(timeoutMs int) tmq.Event`
轮询消息。 轮询消息。
* `func (c *Consumer) Commit(messageID uint64) error` * `func (c *Consumer) Commit() ([]tmq.TopicPartition, error)`
注意:出于兼容目的保留 `tmq.TopicPartition` 参数,当前未使用
提交消息。 提交消息。
* `func (c *Consumer) Close() error` * `func (c *Consumer) Close() error`
关闭消费者 关闭连接
完整订阅示例参见 [GitHub 示例文件](https://github.com/taosdata/driver-go/blob/3.0/examples/tmqoverws/main.go) 完整订阅示例参见 [GitHub 示例文件](https://github.com/taosdata/driver-go/blob/3.0/examples/tmqoverws/main.go)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册