--- toc_max_heading_level: 4 sidebar_position: 4 sidebar_label: Go title: TDengine Go Connector --- import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem'; import Preparation from "./_preparation.mdx" import GoInsert from "../07-develop/03-insert-data/_go_sql.mdx" import GoInfluxLine from "../07-develop/03-insert-data/_go_line.mdx" import GoOpenTSDBTelnet from "../07-develop/03-insert-data/_go_opts_telnet.mdx" import GoOpenTSDBJson from "../07-develop/03-insert-data/_go_opts_json.mdx" import GoQuery from "../07-develop/04-query-data/_go.mdx" `driver-go` 是 TDengine 的官方 Go 语言连接器,实现了 Go 语言 [database/sql](https://golang.org/pkg/database/sql/) 包的接口。Go 开发人员可以通过它开发存取 TDengine 集群数据的应用软件。 `driver-go` 提供两种建立连接的方式。一种是**原生连接**,它通过 TDengine 客户端驱动程序(taosc)原生连接 TDengine 运行实例,支持数据写入、查询、订阅、schemaless 接口和参数绑定接口等功能。另外一种是 **REST 连接**,它通过 taosAdapter 提供的 REST 接口连接 TDengine 运行实例。REST 连接实现的功能特性集合和原生连接有少量不同。 本文介绍如何安装 `driver-go`,并通过 `driver-go` 连接 TDengine 集群、进行数据查询、数据写入等基本操作。 `driver-go` 的源码托管在 [GitHub](https://github.com/taosdata/driver-go)。 ## 支持的平台 原生连接支持的平台和 TDengine 客户端驱动支持的平台一致。 REST 连接支持所有能运行 Go 的平台。 ## 版本支持 请参考[版本支持列表](https://github.com/taosdata/driver-go#remind) ## 处理异常 如果是 TDengine 错误可以通过以下方式获取错误码和错误信息。 ```go // import "github.com/taosdata/driver-go/v3/errors" if err != nil { tError, is := err.(*errors.TaosError) if is { fmt.Println("errorCode:", int(tError.Code)) fmt.Println("errorMessage:", tError.ErrStr) } else { fmt.Println(err.Error()) } } ``` ## TDengine DataType 和 Go DataType | TDengine DataType | Go Type | |-------------------|-----------| | TIMESTAMP | time.Time | | TINYINT | int8 | | SMALLINT | int16 | | INT | int32 | | BIGINT | int64 | | TINYINT UNSIGNED | uint8 | | SMALLINT UNSIGNED | uint16 | | INT UNSIGNED | uint32 | | BIGINT UNSIGNED | uint64 | | FLOAT | float32 | | DOUBLE | float64 | | BOOL | bool | | BINARY | string | | NCHAR | string | | JSON | []byte | **注意**:JSON 类型仅在 tag 中支持。 ## 安装步骤 ### 安装前准备 * 安装 Go 开发环境(Go 1.14 及以上,GCC 4.8.5 及以上) * 如果使用原生连接器,请安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](../#安装客户端驱动) 配置好环境变量,检查命令: * ```go env``` * ```gcc -v``` ### 安装连接器 1. 使用 `go mod` 命令初始化项目: ```text go mod init taos-demo ``` 2. 引入 taosSql : ```go import ( "database/sql" _ "github.com/taosdata/driver-go/v3/taosSql" ) ``` 3. 使用 `go mod tidy` 更新依赖包: ```text go mod tidy ``` 4. 使用 `go run taos-demo` 运行程序或使用 `go build` 命令编译出二进制文件。 ```text go run taos-demo go build ``` ## 建立连接 数据源名称具有通用格式,例如 [PEAR DB](http://pear.php.net/manual/en/package.database.db.intro-dsn.php),但没有类型前缀(方括号表示可选): ``` text [username[:password]@][protocol[(address)]]/[dbname][?param1=value1&...¶mN=valueN] ``` 完整形式的 DSN: ```text username:password@protocol(address)/dbname?param=value ``` _taosSql_ 通过 cgo 实现了 Go 的 `database/sql/driver` 接口。只需要引入驱动就可以使用 [`database/sql`](https://golang.org/pkg/database/sql/) 的接口。 使用 `taosSql` 作为 `driverName` 并且使用一个正确的 [DSN](#DSN) 作为 `dataSourceName`,DSN 支持的参数: * cfg 指定 taos.cfg 目录 示例: ```go package main import ( "database/sql" "fmt" _ "github.com/taosdata/driver-go/v3/taosSql" ) func main() { var taosUri = "root:taosdata@tcp(localhost:6030)/" taos, err := sql.Open("taosSql", taosUri) if err != nil { fmt.Println("failed to connect TDengine, err:", err) return } } ``` _taosRestful_ 通过 `http client` 实现了 Go 的 `database/sql/driver` 接口。只需要引入驱动就可以使用[`database/sql`](https://golang.org/pkg/database/sql/)的接口。 使用 `taosRestful` 作为 `driverName` 并且使用一个正确的 [DSN](#DSN) 作为 `dataSourceName`,DSN 支持的参数: * `disableCompression` 是否接受压缩数据,默认为 true 不接受压缩数据,如果传输数据使用 gzip 压缩设置为 false。 * `readBufferSize` 读取数据的缓存区大小默认为 4K(4096),当查询结果数据量多时可以适当调大该值。 示例: ```go package main import ( "database/sql" "fmt" _ "github.com/taosdata/driver-go/v3/taosRestful" ) func main() { var taosUri = "root:taosdata@http(localhost:6041)/" taos, err := sql.Open("taosRestful", taosUri) if err != nil { fmt.Println("failed to connect TDengine, err:", err) return } } ``` _taosWS_ 通过 `WebSocket` 实现了 Go 的 `database/sql/driver` 接口。只需要引入驱动(driver-go 最低版本 3.0.2)就可以使用[`database/sql`](https://golang.org/pkg/database/sql/)的接口。 使用 `taosWS` 作为 `driverName` 并且使用一个正确的 [DSN](#DSN) 作为 `dataSourceName`,DSN 支持的参数: * `writeTimeout` 通过 WebSocket 发送数据的超时时间。 * `readTimeout` 通过 WebSocket 接收响应数据的超时时间。 示例: ```go package main import ( "database/sql" "fmt" _ "github.com/taosdata/driver-go/v3/taosWS" ) func main() { var taosUri = "root:taosdata@ws(localhost:6041)/" taos, err := sql.Open("taosWS", taosUri) if err != nil { fmt.Println("failed to connect TDengine, err:", err) return } } ``` ### 指定 URL 和 Properties 获取连接 Go 连接器不支持此功能 ### 配置参数的优先级 Go 连接器不支持此功能 ## 使用示例 ### 创建数据库和表 ```go var taosDSN = "root:taosdata@tcp(localhost:6030)/" taos, err := sql.Open("taosSql", taosDSN) if err != nil { log.Fatalln("failed to connect TDengine, err:", err) } defer taos.Close() _, err := taos.Exec("CREATE DATABASE power") if err != nil { log.Fatalln("failed to create database, err:", err) } _, err = taos.Exec("CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)") if err != nil { log.Fatalln("failed to create stable, err:", err) } ``` ### 插入数据 ### 查询数据 ### 执行带有 reqId 的 SQL 此 reqId 可用于请求链路追踪。 ```go db, err := sql.Open("taosSql", "root:taosdata@tcp(localhost:6030)/") if err != nil { panic(err) } defer db.Close() ctx := context.WithValue(context.Background(), common.ReqIDKey, common.GetReqID()) _, err = db.ExecContext(ctx, "create database if not exists example_taos_sql") if err != nil { panic(err) } ``` ### 通过参数绑定写入数据 ```go package main import ( "time" "github.com/taosdata/driver-go/v3/af" "github.com/taosdata/driver-go/v3/common" "github.com/taosdata/driver-go/v3/common/param" ) func main() { db, err := af.Open("", "root", "taosdata", "", 0) if err != nil { panic(err) } defer db.Close() _, err = db.Exec("create database if not exists example_stmt") if err != nil { panic(err) } _, err = db.Exec("create table if not exists example_stmt.tb1(ts timestamp," + "c1 bool," + "c2 tinyint," + "c3 smallint," + "c4 int," + "c5 bigint," + "c6 tinyint unsigned," + "c7 smallint unsigned," + "c8 int unsigned," + "c9 bigint unsigned," + "c10 float," + "c11 double," + "c12 binary(20)," + "c13 nchar(20)" + ")") if err != nil { panic(err) } stmt := db.InsertStmt() err = stmt.Prepare("insert into example_stmt.tb1 values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)") if err != nil { panic(err) } now := time.Now() params := make([]*param.Param, 14) params[0] = param.NewParam(2). AddTimestamp(now, common.PrecisionMilliSecond). AddTimestamp(now.Add(time.Second), common.PrecisionMilliSecond) params[1] = param.NewParam(2).AddBool(true).AddNull() params[2] = param.NewParam(2).AddTinyint(2).AddNull() params[3] = param.NewParam(2).AddSmallint(3).AddNull() params[4] = param.NewParam(2).AddInt(4).AddNull() params[5] = param.NewParam(2).AddBigint(5).AddNull() params[6] = param.NewParam(2).AddUTinyint(6).AddNull() params[7] = param.NewParam(2).AddUSmallint(7).AddNull() params[8] = param.NewParam(2).AddUInt(8).AddNull() params[9] = param.NewParam(2).AddUBigint(9).AddNull() params[10] = param.NewParam(2).AddFloat(10).AddNull() params[11] = param.NewParam(2).AddDouble(11).AddNull() params[12] = param.NewParam(2).AddBinary([]byte("binary")).AddNull() params[13] = param.NewParam(2).AddNchar("nchar").AddNull() paramTypes := param.NewColumnType(14). AddTimestamp(). AddBool(). AddTinyint(). AddSmallint(). AddInt(). AddBigint(). AddUTinyint(). AddUSmallint(). AddUInt(). AddUBigint(). AddFloat(). AddDouble(). AddBinary(6). AddNchar(5) err = stmt.BindParam(params, paramTypes) if err != nil { panic(err) } err = stmt.AddBatch() if err != nil { panic(err) } err = stmt.Execute() if err != nil { panic(err) } err = stmt.Close() if err != nil { panic(err) } // select * from example_stmt.tb1 } ``` ```go package main import ( "database/sql" "fmt" "time" "github.com/taosdata/driver-go/v3/common" "github.com/taosdata/driver-go/v3/common/param" _ "github.com/taosdata/driver-go/v3/taosRestful" "github.com/taosdata/driver-go/v3/ws/stmt" ) func main() { db, err := sql.Open("taosRestful", "root:taosdata@http(localhost:6041)/") if err != nil { panic(err) } defer db.Close() prepareEnv(db) config := stmt.NewConfig("ws://127.0.0.1:6041/rest/stmt", 0) config.SetConnectUser("root") config.SetConnectPass("taosdata") config.SetConnectDB("example_ws_stmt") config.SetMessageTimeout(common.DefaultMessageTimeout) config.SetWriteWait(common.DefaultWriteWait) config.SetErrorHandler(func(connector *stmt.Connector, err error) { panic(err) }) config.SetCloseHandler(func() { fmt.Println("stmt connector closed") }) connector, err := stmt.NewConnector(config) if err != nil { panic(err) } now := time.Now() { stmt, err := connector.Init() if err != nil { panic(err) } err = stmt.Prepare("insert into ? using all_json tags(?) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)") if err != nil { panic(err) } err = stmt.SetTableName("tb1") if err != nil { panic(err) } err = stmt.SetTags(param.NewParam(1).AddJson([]byte(`{"tb":1}`)), param.NewColumnType(1).AddJson(0)) if err != nil { panic(err) } params := []*param.Param{ param.NewParam(3).AddTimestamp(now, 0).AddTimestamp(now.Add(time.Second), 0).AddTimestamp(now.Add(time.Second*2), 0), param.NewParam(3).AddBool(true).AddNull().AddBool(true), param.NewParam(3).AddTinyint(1).AddNull().AddTinyint(1), param.NewParam(3).AddSmallint(1).AddNull().AddSmallint(1), param.NewParam(3).AddInt(1).AddNull().AddInt(1), param.NewParam(3).AddBigint(1).AddNull().AddBigint(1), param.NewParam(3).AddUTinyint(1).AddNull().AddUTinyint(1), param.NewParam(3).AddUSmallint(1).AddNull().AddUSmallint(1), param.NewParam(3).AddUInt(1).AddNull().AddUInt(1), param.NewParam(3).AddUBigint(1).AddNull().AddUBigint(1), param.NewParam(3).AddFloat(1).AddNull().AddFloat(1), param.NewParam(3).AddDouble(1).AddNull().AddDouble(1), param.NewParam(3).AddBinary([]byte("test_binary")).AddNull().AddBinary([]byte("test_binary")), param.NewParam(3).AddNchar("test_nchar").AddNull().AddNchar("test_nchar"), } paramTypes := param.NewColumnType(14). AddTimestamp(). AddBool(). AddTinyint(). AddSmallint(). AddInt(). AddBigint(). AddUTinyint(). AddUSmallint(). AddUInt(). AddUBigint(). AddFloat(). AddDouble(). AddBinary(0). AddNchar(0) err = stmt.BindParam(params, paramTypes) if err != nil { panic(err) } err = stmt.AddBatch() if err != nil { panic(err) } err = stmt.Exec() if err != nil { panic(err) } affected := stmt.GetAffectedRows() fmt.Println("all_json affected rows:", affected) err = stmt.Close() if err != nil { panic(err) } } { stmt, err := connector.Init() if err != nil { panic(err) } err = stmt.Prepare("insert into ? using all_all tags(?,?,?,?,?,?,?,?,?,?,?,?,?,?) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)") err = stmt.SetTableName("tb1") if err != nil { panic(err) } err = stmt.SetTableName("tb2") if err != nil { panic(err) } err = stmt.SetTags( param.NewParam(14). AddTimestamp(now, 0). AddBool(true). AddTinyint(2). AddSmallint(2). AddInt(2). AddBigint(2). AddUTinyint(2). AddUSmallint(2). AddUInt(2). AddUBigint(2). AddFloat(2). AddDouble(2). AddBinary([]byte("tb2")). AddNchar("tb2"), param.NewColumnType(14). AddTimestamp(). AddBool(). AddTinyint(). AddSmallint(). AddInt(). AddBigint(). AddUTinyint(). AddUSmallint(). AddUInt(). AddUBigint(). AddFloat(). AddDouble(). AddBinary(0). AddNchar(0), ) if err != nil { panic(err) } params := []*param.Param{ param.NewParam(3).AddTimestamp(now, 0).AddTimestamp(now.Add(time.Second), 0).AddTimestamp(now.Add(time.Second*2), 0), param.NewParam(3).AddBool(true).AddNull().AddBool(true), param.NewParam(3).AddTinyint(1).AddNull().AddTinyint(1), param.NewParam(3).AddSmallint(1).AddNull().AddSmallint(1), param.NewParam(3).AddInt(1).AddNull().AddInt(1), param.NewParam(3).AddBigint(1).AddNull().AddBigint(1), param.NewParam(3).AddUTinyint(1).AddNull().AddUTinyint(1), param.NewParam(3).AddUSmallint(1).AddNull().AddUSmallint(1), param.NewParam(3).AddUInt(1).AddNull().AddUInt(1), param.NewParam(3).AddUBigint(1).AddNull().AddUBigint(1), param.NewParam(3).AddFloat(1).AddNull().AddFloat(1), param.NewParam(3).AddDouble(1).AddNull().AddDouble(1), param.NewParam(3).AddBinary([]byte("test_binary")).AddNull().AddBinary([]byte("test_binary")), param.NewParam(3).AddNchar("test_nchar").AddNull().AddNchar("test_nchar"), } paramTypes := param.NewColumnType(14). AddTimestamp(). AddBool(). AddTinyint(). AddSmallint(). AddInt(). AddBigint(). AddUTinyint(). AddUSmallint(). AddUInt(). AddUBigint(). AddFloat(). AddDouble(). AddBinary(0). AddNchar(0) err = stmt.BindParam(params, paramTypes) if err != nil { panic(err) } err = stmt.AddBatch() if err != nil { panic(err) } err = stmt.Exec() if err != nil { panic(err) } affected := stmt.GetAffectedRows() fmt.Println("all_all affected rows:", affected) err = stmt.Close() if err != nil { panic(err) } } } func prepareEnv(db *sql.DB) { steps := []string{ "create database example_ws_stmt", "create table example_ws_stmt.all_json(ts timestamp," + "c1 bool," + "c2 tinyint," + "c3 smallint," + "c4 int," + "c5 bigint," + "c6 tinyint unsigned," + "c7 smallint unsigned," + "c8 int unsigned," + "c9 bigint unsigned," + "c10 float," + "c11 double," + "c12 binary(20)," + "c13 nchar(20)" + ")" + "tags(t json)", "create table example_ws_stmt.all_all(" + "ts timestamp," + "c1 bool," + "c2 tinyint," + "c3 smallint," + "c4 int," + "c5 bigint," + "c6 tinyint unsigned," + "c7 smallint unsigned," + "c8 int unsigned," + "c9 bigint unsigned," + "c10 float," + "c11 double," + "c12 binary(20)," + "c13 nchar(20)" + ")" + "tags(" + "tts timestamp," + "tc1 bool," + "tc2 tinyint," + "tc3 smallint," + "tc4 int," + "tc5 bigint," + "tc6 tinyint unsigned," + "tc7 smallint unsigned," + "tc8 int unsigned," + "tc9 bigint unsigned," + "tc10 float," + "tc11 double," + "tc12 binary(20)," + "tc13 nchar(20))", } for _, step := range steps { _, err := db.Exec(step) if err != nil { panic(err) } } } ``` ### 无模式写入 ```go import ( "fmt" "github.com/taosdata/driver-go/v3/af" ) func main() { conn, err := af.Open("localhost", "root", "taosdata", "", 6030) if err != nil { fmt.Println("fail to connect, err:", err) } defer conn.Close() _, err = conn.Exec("create database if not exists example") if err != nil { panic(err) } _, err = conn.Exec("use example") if err != nil { panic(err) } influxdbData := "st,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000" err = conn.InfluxDBInsertLines([]string{influxdbData}, "ns") if err != nil { panic(err) } telnetData := "stb0_0 1626006833 4 host=host0 interface=eth0" err = conn.OpenTSDBInsertTelnetLines([]string{telnetData}) if err != nil { panic(err) } jsonData := "{\"metric\": \"meter_current\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}" err = conn.OpenTSDBInsertJsonPayload(jsonData) if err != nil { panic(err) } } ``` ```go import ( "database/sql" "log" "time" "github.com/taosdata/driver-go/v3/common" _ "github.com/taosdata/driver-go/v3/taosWS" "github.com/taosdata/driver-go/v3/ws/schemaless" ) func main() { db, err := sql.Open("taosWS", "root:taosdata@ws(localhost:6041)/") if err != nil { log.Fatal(err) } defer db.Close() _, err = db.Exec("create database if not exists schemaless_ws") if err != nil { log.Fatal(err) } s, err := schemaless.NewSchemaless(schemaless.NewConfig("ws://localhost:6041/rest/schemaless", 1, schemaless.SetDb("schemaless_ws"), schemaless.SetReadTimeout(10*time.Second), schemaless.SetWriteTimeout(10*time.Second), schemaless.SetUser("root"), schemaless.SetPassword("taosdata"), schemaless.SetErrorHandler(func(err error) { log.Fatal(err) }), )) if err != nil { panic(err) } influxdbData := "st,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000" telnetData := "stb0_0 1626006833 4 host=host0 interface=eth0" jsonData := "{\"metric\": \"meter_current\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}" err = s.Insert(influxdbData, schemaless.InfluxDBLineProtocol, "ns", 0, common.GetReqID()) if err != nil { panic(err) } err = s.Insert(telnetData, schemaless.OpenTSDBTelnetLineProtocol, "ms", 0, common.GetReqID()) if err != nil { panic(err) } err = s.Insert(jsonData, schemaless.OpenTSDBJsonFormatProtocol, "ms", 0, common.GetReqID()) if err != nil { panic(err) } } ``` ### 执行带有 reqId 的无模式写入 ```go func (s *Schemaless) Insert(lines string, protocol int, precision string, ttl int, reqID int64) error ``` 可以通过 `common.GetReqID()` 获取唯一 id。 ### 数据订阅 TDengine Go 连接器支持订阅功能,应用 API 如下: #### 创建 Topic ```go db, err := af.Open("", "root", "taosdata", "", 0) if err != nil { panic(err) } defer db.Close() _, err = db.Exec("create database if not exists example_tmq WAL_RETENTION_PERIOD 86400") if err != nil { panic(err) } _, err = db.Exec("create topic if not exists example_tmq_topic as DATABASE example_tmq") if err != nil { panic(err) } ``` #### 创建 Consumer ```go consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{ "group.id": "test", "auto.offset.reset": "earliest", "td.connect.ip": "127.0.0.1", "td.connect.user": "root", "td.connect.pass": "taosdata", "td.connect.port": "6030", "client.id": "test_tmq_client", "enable.auto.commit": "false", "msg.with.table.name": "true", }) if err != nil { panic(err) } ``` #### 订阅消费数据 ```go err = consumer.Subscribe("example_tmq_topic", nil) if err != nil { panic(err) } for i := 0; i < 5; i++ { ev := consumer.Poll(500) if ev != nil { switch e := ev.(type) { case *tmqcommon.DataMessage: fmt.Printf("get message:%v\n", e) case tmqcommon.Error: fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e) panic(e) } consumer.Commit() } } ``` #### 指定订阅 Offset ```go partitions, err := consumer.Assignment() if err != nil { panic(err) } for i := 0; i < len(partitions); i++ { fmt.Println(partitions[i]) err = consumer.Seek(tmqcommon.TopicPartition{ Topic: partitions[i].Topic, Partition: partitions[i].Partition, Offset: 0, }, 0) if err != nil { panic(err) } } ``` #### 关闭订阅 ```go err = consumer.Close() if err != nil { panic(err) } ``` #### 完整示例 ```go package main import ( "fmt" "os" "github.com/taosdata/driver-go/v3/af" "github.com/taosdata/driver-go/v3/af/tmq" tmqcommon "github.com/taosdata/driver-go/v3/common/tmq" ) func main() { db, err := af.Open("", "root", "taosdata", "", 0) if err != nil { panic(err) } defer db.Close() _, err = db.Exec("create database if not exists example_tmq WAL_RETENTION_PERIOD 86400") if err != nil { panic(err) } _, err = db.Exec("create topic if not exists example_tmq_topic as DATABASE example_tmq") if err != nil { panic(err) } if err != nil { panic(err) } consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{ "group.id": "test", "auto.offset.reset": "earliest", "td.connect.ip": "127.0.0.1", "td.connect.user": "root", "td.connect.pass": "taosdata", "td.connect.port": "6030", "client.id": "test_tmq_client", "enable.auto.commit": "false", "msg.with.table.name": "true", }) if err != nil { panic(err) } err = consumer.Subscribe("example_tmq_topic", nil) if err != nil { panic(err) } _, err = db.Exec("create table example_tmq.t1 (ts timestamp,v int)") if err != nil { panic(err) } _, err = db.Exec("insert into example_tmq.t1 values(now,1)") if err != nil { panic(err) } for i := 0; i < 5; i++ { ev := consumer.Poll(500) if ev != nil { switch e := ev.(type) { case *tmqcommon.DataMessage: fmt.Printf("get message:%v\n", e) case tmqcommon.Error: fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e) panic(e) } consumer.Commit() } } partitions, err := consumer.Assignment() if err != nil { panic(err) } for i := 0; i < len(partitions); i++ { fmt.Println(partitions[i]) err = consumer.Seek(tmqcommon.TopicPartition{ Topic: partitions[i].Topic, Partition: partitions[i].Partition, Offset: 0, }, 0) if err != nil { panic(err) } } partitions, err = consumer.Assignment() if err != nil { panic(err) } for i := 0; i < len(partitions); i++ { fmt.Println(partitions[i]) } err = consumer.Close() if err != nil { panic(err) } } ``` ```go package main import ( "database/sql" "fmt" "github.com/taosdata/driver-go/v3/common" tmqcommon "github.com/taosdata/driver-go/v3/common/tmq" _ "github.com/taosdata/driver-go/v3/taosRestful" "github.com/taosdata/driver-go/v3/ws/tmq" ) func main() { db, err := sql.Open("taosRestful", "root:taosdata@http(localhost:6041)/") if err != nil { panic(err) } defer db.Close() prepareEnv(db) consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{ "ws.url": "ws://127.0.0.1:6041/rest/tmq", "ws.message.channelLen": uint(0), "ws.message.timeout": common.DefaultMessageTimeout, "ws.message.writeWait": common.DefaultWriteWait, "td.connect.user": "root", "td.connect.pass": "taosdata", "group.id": "example", "client.id": "example_consumer", "auto.offset.reset": "earliest", }) if err != nil { panic(err) } err = consumer.Subscribe("example_ws_tmq_topic", nil) if err != nil { panic(err) } go func() { _, err := db.Exec("create table example_ws_tmq.t_all(ts timestamp," + "c1 bool," + "c2 tinyint," + "c3 smallint," + "c4 int," + "c5 bigint," + "c6 tinyint unsigned," + "c7 smallint unsigned," + "c8 int unsigned," + "c9 bigint unsigned," + "c10 float," + "c11 double," + "c12 binary(20)," + "c13 nchar(20)" + ")") if err != nil { panic(err) } _, err = db.Exec("insert into example_ws_tmq.t_all values(now,true,2,3,4,5,6,7,8,9,10.123,11.123,'binary','nchar')") if err != nil { panic(err) } }() for i := 0; i < 5; i++ { ev := consumer.Poll(500) if ev != nil { switch e := ev.(type) { case *tmqcommon.DataMessage: fmt.Printf("get message:%v\n", e) case tmqcommon.Error: fmt.Printf("%% Error: %v: %v\n", e.Code(), e) panic(e) } consumer.Commit() } } partitions, err := consumer.Assignment() if err != nil { panic(err) } for i := 0; i < len(partitions); i++ { fmt.Println(partitions[i]) err = consumer.Seek(tmqcommon.TopicPartition{ Topic: partitions[i].Topic, Partition: partitions[i].Partition, Offset: 0, }, 0) if err != nil { panic(err) } } partitions, err = consumer.Assignment() if err != nil { panic(err) } for i := 0; i < len(partitions); i++ { fmt.Println(partitions[i]) } err = consumer.Close() if err != nil { panic(err) } } func prepareEnv(db *sql.DB) { _, err := db.Exec("create database example_ws_tmq WAL_RETENTION_PERIOD 86400") if err != nil { panic(err) } _, err = db.Exec("create topic example_ws_tmq_topic as database example_ws_tmq") if err != nil { panic(err) } } ``` ### 更多示例程序 * [示例程序](https://github.com/taosdata/driver-go/tree/3.0/examples) * [视频教程](https://www.taosdata.com/blog/2020/11/11/1951.html)。 ## 常见问题 1. database/sql 中 stmt(参数绑定)相关接口崩溃 REST 不支持参数绑定相关接口,建议使用`db.Exec`和`db.Query`。 2. 使用 `use db` 语句后执行其他语句报错 `[0x217] Database not specified or available` 在 REST 接口中 SQL 语句的执行无上下文关联,使用 `use db` 语句不会生效,解决办法见上方使用限制章节。 3. 使用 taosSql 不报错使用 taosRestful 报错 `[0x217] Database not specified or available` 因为 REST 接口无状态,使用 `use db` 语句不会生效,解决办法见上方使用限制章节。 4. `readBufferSize` 参数调大后无明显效果 `readBufferSize` 调大后会减少获取结果时 `syscall` 的调用。如果查询结果的数据量不大,修改该参数不会带来明显提升,如果该参数修改过大,瓶颈会在解析 JSON 数据。如果需要优化查询速度,需要根据实际情况调整该值来达到查询效果最优。 5. `disableCompression` 参数设置为 `false` 时查询效率降低 当 `disableCompression` 参数设置为 `false` 时查询结果会使用 `gzip` 压缩后传输,拿到数据后要先进行 `gzip` 解压。 6. `go get` 命令无法获取包,或者获取包超时 设置 Go 代理 `go env -w GOPROXY=https://goproxy.cn,direct`。 ## API 参考 全部 API 见 [driver-go 文档](https://pkg.go.dev/github.com/taosdata/driver-go/v3)