提交 0df73c4b 编写于 作者: T t_max

docs: update driver-go document

上级 f49ce058
......@@ -31,42 +31,57 @@ REST connections are supported on all platforms that can run Go.
Please refer to [version support list](https://github.com/taosdata/driver-go#remind)
## Supported features
## Handling exceptions
### Native connections
A "native connection" is established by the connector directly to the TDengine instance via the TDengine client driver (taosc). The supported functional features are:
* Normal queries
* Continuous queries
* Subscriptions
* Schemaless interface
* Parameter binding interface
### REST connection
A "REST connection" is a connection between the application and the TDengine instance via the REST API provided by the taosAdapter component. The following features are supported:
If it is a TDengine error, you can get the error code and error information in the following ways.
```go
// import "github.com/taosdata/driver-go/v3/errors"
if err != nil {
tError, is := err.(*errors.TaosError)
if is {
fmt.Println("errorCode:", int(tError.Code))
fmt.Println("errorMessage:", tError.ErrStr)
} else {
fmt.Println(err.Error())
}
}
```
* Normal queries
* Continuous queries
## TDengine DataType vs. Go DataType
| TDengine DataType | Go Type |
|-------------------|-----------|
| TIMESTAMP | time.Time |
| TINYINT | int8 |
| SMALLINT | int16 |
| INT | int32 |
| BIGINT | int64 |
| TINYINT UNSIGNED | uint8 |
| SMALLINT UNSIGNED | uint16 |
| INT UNSIGNED | uint32 |
| BIGINT UNSIGNED | uint64 |
| FLOAT | float32 |
| DOUBLE | float64 |
| BOOL | bool |
| BINARY | string |
| NCHAR | string |
| JSON | []byte |
**Note**: Only TAG supports JSON types
## Installation Steps
### Pre-installation preparation
* Install Go development environment (Go 1.14 and above, GCC 4.8.5 and above)
- If you use the native connector, please install the TDengine client driver. Please refer to [Install Client Driver](/reference/connector/#install-client-driver) for specific steps
* If you use the native connector, please install the TDengine client driver. Please refer to [Install Client Driver](/reference/connector/#install-client-driver) for specific steps
Configure the environment variables and check the command.
* ```go env```
* ```gcc -v```
### Use go get to install
`go get -u github.com/taosdata/driver-go/v3@latest`
### Manage with go mod
### Install the connectors
1. Initialize the project with the `go mod` command.
......@@ -98,8 +113,6 @@ Configure the environment variables and check the command.
## Establishing a connection
### Data source name (DSN)
Data source names have a standard format, e.g. [PEAR DB](http://pear.php.net/manual/en/package.database.db.intro-dsn.php), but no type prefix (square brackets indicate optionally):
``` text
......@@ -111,9 +124,7 @@ DSN in full form.
```text
username:password@protocol(address)/dbname?param=value
```
### Connecting via connector
<Tabs defaultValue="rest">
<Tabs defaultValue="rest" groupId="connect">
<TabItem value="native" label="native connection">
_taosSql_ implements Go's `database/sql/driver` interface via cgo. You can use the [`database/sql`](https://golang.org/pkg/database/sql/) interface by simply introducing the driver.
......@@ -209,340 +220,902 @@ func main() {
</TabItem>
</Tabs>
## Usage examples
### Write data
### Specify the URL and Properties to get the connection
#### SQL Write
The Go connector does not support this feature
<GoInsert />
### Priority of configuration parameters
#### InfluxDB line protocol write
The Go connector does not support this feature
<GoInfluxLine />
## Usage examples
#### OpenTSDB Telnet line protocol write
### Create database and tables
<GoOpenTSDBTelnet />
```go
var taosDSN = "root:taosdata@tcp(localhost:6030)/"
taos, err := sql.Open("taosSql", taosDSN)
if err != nil {
log.Fatalln("failed to connect TDengine, err:", err)
}
defer taos.Close()
_, err := taos.Exec("CREATE DATABASE power")
if err != nil {
log.Fatalln("failed to create database, err:", err)
}
_, err = taos.Exec("CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)")
if err != nil {
log.Fatalln("failed to create stable, err:", err)
}
```
#### OpenTSDB JSON line protocol write
### Insert data
<GoOpenTSDBJson />
<GoInsert />
### Query data
### Querying data
<GoQuery />
### More sample programs
* [sample program](https://github.com/taosdata/driver-go/tree/3.0/examples)
### execute SQL with reqId
## Usage limitations
This reqId can be used to request link tracing.
Since the REST interface is stateless, the `use db` syntax will not work. You need to put the db name into the SQL command, e.g. `create table if not exists tb1 (ts timestamp, a int)` to `create table if not exists test.tb1 (ts timestamp, a int)` otherwise it will report the error `[0x217] Database not specified or available`.
```go
db, err := sql.Open("taosSql", "root:taosdata@tcp(localhost:6030)/")
if err != nil {
panic(err)
}
defer db.Close()
ctx := context.WithValue(context.Background(), common.ReqIDKey, common.GetReqID())
_, err = db.ExecContext(ctx, "create database if not exists example_taos_sql")
if err != nil {
panic(err)
}
```
You can also put the db name in the DSN by changing `root:taosdata@http(localhost:6041)/` to `root:taosdata@http(localhost:6041)/test`. Executing the `create database` statement when the specified db does not exist will not report an error while executing other queries or writing against that db will report an error.
### Writing data via parameter binding
The complete example is as follows.
<Tabs defaultValue="native" groupId="connect">
<TabItem value="native" label="native connection">
```go
package main
import (
"database/sql"
"fmt"
"time"
_ "github.com/taosdata/driver-go/v3/taosRestful"
"github.com/taosdata/driver-go/v3/af"
"github.com/taosdata/driver-go/v3/common"
"github.com/taosdata/driver-go/v3/common/param"
)
func main() {
var taosDSN = "root:taosdata@http(localhost:6041)/test"
taos, err := sql.Open("taosRestful", taosDSN)
db, err := af.Open("", "root", "taosdata", "", 0)
if err != nil {
fmt.Println("failed to connect TDengine, err:", err)
return
panic(err)
}
defer taos.Close()
taos.Exec("create database if not exists test")
taos.Exec("create table if not exists tb1 (ts timestamp, a int)")
_, err = taos.Exec("insert into tb1 values(now, 0)(now+1s,1)(now+2s,2)(now+3s,3)")
defer db.Close()
_, err = db.Exec("create database if not exists example_stmt")
if err != nil {
fmt.Println("failed to insert, err:", err)
return
panic(err)
}
rows, err := taos.Query("select * from tb1")
_, err = db.Exec("create table if not exists example_stmt.tb1(ts timestamp," +
"c1 bool," +
"c2 tinyint," +
"c3 smallint," +
"c4 int," +
"c5 bigint," +
"c6 tinyint unsigned," +
"c7 smallint unsigned," +
"c8 int unsigned," +
"c9 bigint unsigned," +
"c10 float," +
"c11 double," +
"c12 binary(20)," +
"c13 nchar(20)" +
")")
if err != nil {
fmt.Println("failed to select from table, err:", err)
return
panic(err)
}
defer rows.Close()
for rows.Next() {
var r struct {
ts time.Time
a int
stmt := db.InsertStmt()
err = stmt.Prepare("insert into example_stmt.tb1 values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
if err != nil {
panic(err)
}
err := rows.Scan(&r.ts, &r.a)
now := time.Now()
params := make([]*param.Param, 14)
params[0] = param.NewParam(2).
AddTimestamp(now, common.PrecisionMilliSecond).
AddTimestamp(now.Add(time.Second), common.PrecisionMilliSecond)
params[1] = param.NewParam(2).AddBool(true).AddNull()
params[2] = param.NewParam(2).AddTinyint(2).AddNull()
params[3] = param.NewParam(2).AddSmallint(3).AddNull()
params[4] = param.NewParam(2).AddInt(4).AddNull()
params[5] = param.NewParam(2).AddBigint(5).AddNull()
params[6] = param.NewParam(2).AddUTinyint(6).AddNull()
params[7] = param.NewParam(2).AddUSmallint(7).AddNull()
params[8] = param.NewParam(2).AddUInt(8).AddNull()
params[9] = param.NewParam(2).AddUBigint(9).AddNull()
params[10] = param.NewParam(2).AddFloat(10).AddNull()
params[11] = param.NewParam(2).AddDouble(11).AddNull()
params[12] = param.NewParam(2).AddBinary([]byte("binary")).AddNull()
params[13] = param.NewParam(2).AddNchar("nchar").AddNull()
paramTypes := param.NewColumnType(14).
AddTimestamp().
AddBool().
AddTinyint().
AddSmallint().
AddInt().
AddBigint().
AddUTinyint().
AddUSmallint().
AddUInt().
AddUBigint().
AddFloat().
AddDouble().
AddBinary(6).
AddNchar(5)
err = stmt.BindParam(params, paramTypes)
if err != nil {
fmt.Println("scan error:\n", err)
return
panic(err)
}
err = stmt.AddBatch()
if err != nil {
panic(err)
}
err = stmt.Execute()
if err != nil {
panic(err)
}
fmt.Println(r.ts, r.a)
err = stmt.Close()
if err != nil {
panic(err)
}
// select * from example_stmt.tb1
}
```
## Frequently Asked Questions
1. bind interface in database/sql crashes
REST does not support parameter binding related interface. It is recommended to use `db.Exec` and `db.Query`.
2. error `[0x217] Database not specified or available` after executing other statements with `use db` statement
The execution of SQL command in the REST interface is not contextual, so using `use db` statement will not work, see the usage restrictions section above.
3. use `taosSql` without error but use `taosRestful` with error `[0x217] Database not specified or available`
Because the REST interface is stateless, using the `use db` statement will not take effect. See the usage restrictions section above.
4. `readBufferSize` parameter has no significant effect after being increased
Increasing `readBufferSize` will reduce the number of `syscall` calls when fetching results. If the query result is smaller, modifying this parameter will not improve performance significantly. If you increase the parameter value too much, the bottleneck will be parsing JSON data. If you need to optimize the query speed, you must adjust the value based on the actual situation to achieve the best query performance.
5. `disableCompression` parameter is set to `false` when the query efficiency is reduced
When set `disableCompression` parameter to `false`, the query result will be compressed by `gzip` and then transmitted, so you have to decompress the data by `gzip` after getting it.
6. `go get` command can't get the package, or timeout to get the package
Set Go proxy `go env -w GOPROXY=https://goproxy.cn,direct`.
## Common APIs
### database/sql API
* `sql.Open(DRIVER_NAME string, dataSourceName string) *DB`
Use This API to open a DB, returning an object of type \*DB.
:::info
This API is created successfully without checking permissions, but only when you execute a Query or Exec, and check if user/password/host/port is legal.
:::
* `func (db *DB) Exec(query string, args ...interface{}) (Result, error)`
`sql.Open` built-in method to execute non-query related SQL.
* `func (db *DB) Query(query string, args ...interface{}) (*Rows, error)`
`sql.Open` Built-in method to execute query statements.
### Advanced functions (af) API
The `af` package encapsulates TDengine advanced functions such as connection management, subscriptions, schemaless, parameter binding, etc.
#### Connection management
* `af.Open(host, user, pass, db string, port int) (*Connector, error)`
This API creates a connection to taosd via cgo.
* `func (conn *Connector) Close() error`
Closes the connection.
#### Subscribe
* `func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)`
Creates consumer group.
* `func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error`
Note: `rebalanceCb` is reserved for compatibility purpose
Subscribes a topic.
* `func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error`
Note: `rebalanceCb` is reserved for compatibility purpose
Subscribes to topics.
* `func (c *Consumer) Poll(timeoutMs int) tmq.Event`
Polling information.
* `func (c *Consumer) Commit() ([]tmq.TopicPartition, error)`
Note: `tmq.TopicPartition` is reserved for compatibility purpose
Commit information.
* `func (c *Consumer) Assignment() (partitions []tmq.TopicPartition, err error)`
Get Assignment(TDengine >= 3.0.5.0 and driver-go >= v3.5.0 are required).
* `func (c *Consumer) Seek(partition tmq.TopicPartition, ignoredTimeoutMs int) error`
Note: `ignoredTimeoutMs` is reserved for compatibility purpose
Seek offset(TDengine >= 3.0.5.0 and driver-go >= v3.5.0 are required).
* `func (c *Consumer) Unsubscribe() error`
Unsubscribe.
* `func (c *Consumer) Close() error`
Close consumer.
#### schemaless
* `func (conn *Connector) InfluxDBInsertLines(lines []string, precision string) error`
</TabItem>
<TabItem value="WebSocket" label="WebSocket connection">
Write to InfluxDB line protocol.
```go
package main
* `func (conn *Connector) OpenTSDBInsertTelnetLines(lines []string) error`
import (
"database/sql"
"fmt"
"time"
Write OpenTDSB telnet protocol data.
"github.com/taosdata/driver-go/v3/common"
"github.com/taosdata/driver-go/v3/common/param"
_ "github.com/taosdata/driver-go/v3/taosRestful"
"github.com/taosdata/driver-go/v3/ws/stmt"
)
* `func (conn *Connector) OpenTSDBInsertJsonPayload(payload string) error`
func main() {
db, err := sql.Open("taosRestful", "root:taosdata@http(localhost:6041)/")
if err != nil {
panic(err)
}
defer db.Close()
prepareEnv(db)
config := stmt.NewConfig("ws://127.0.0.1:6041/rest/stmt", 0)
config.SetConnectUser("root")
config.SetConnectPass("taosdata")
config.SetConnectDB("example_ws_stmt")
config.SetMessageTimeout(common.DefaultMessageTimeout)
config.SetWriteWait(common.DefaultWriteWait)
config.SetErrorHandler(func(connector *stmt.Connector, err error) {
panic(err)
})
config.SetCloseHandler(func() {
fmt.Println("stmt connector closed")
})
connector, err := stmt.NewConnector(config)
if err != nil {
panic(err)
}
now := time.Now()
{
stmt, err := connector.Init()
if err != nil {
panic(err)
}
err = stmt.Prepare("insert into ? using all_json tags(?) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
if err != nil {
panic(err)
}
err = stmt.SetTableName("tb1")
if err != nil {
panic(err)
}
err = stmt.SetTags(param.NewParam(1).AddJson([]byte(`{"tb":1}`)), param.NewColumnType(1).AddJson(0))
if err != nil {
panic(err)
}
params := []*param.Param{
param.NewParam(3).AddTimestamp(now, 0).AddTimestamp(now.Add(time.Second), 0).AddTimestamp(now.Add(time.Second*2), 0),
param.NewParam(3).AddBool(true).AddNull().AddBool(true),
param.NewParam(3).AddTinyint(1).AddNull().AddTinyint(1),
param.NewParam(3).AddSmallint(1).AddNull().AddSmallint(1),
param.NewParam(3).AddInt(1).AddNull().AddInt(1),
param.NewParam(3).AddBigint(1).AddNull().AddBigint(1),
param.NewParam(3).AddUTinyint(1).AddNull().AddUTinyint(1),
param.NewParam(3).AddUSmallint(1).AddNull().AddUSmallint(1),
param.NewParam(3).AddUInt(1).AddNull().AddUInt(1),
param.NewParam(3).AddUBigint(1).AddNull().AddUBigint(1),
param.NewParam(3).AddFloat(1).AddNull().AddFloat(1),
param.NewParam(3).AddDouble(1).AddNull().AddDouble(1),
param.NewParam(3).AddBinary([]byte("test_binary")).AddNull().AddBinary([]byte("test_binary")),
param.NewParam(3).AddNchar("test_nchar").AddNull().AddNchar("test_nchar"),
}
paramTypes := param.NewColumnType(14).
AddTimestamp().
AddBool().
AddTinyint().
AddSmallint().
AddInt().
AddBigint().
AddUTinyint().
AddUSmallint().
AddUInt().
AddUBigint().
AddFloat().
AddDouble().
AddBinary(0).
AddNchar(0)
err = stmt.BindParam(params, paramTypes)
if err != nil {
panic(err)
}
err = stmt.AddBatch()
if err != nil {
panic(err)
}
err = stmt.Exec()
if err != nil {
panic(err)
}
affected := stmt.GetAffectedRows()
fmt.Println("all_json affected rows:", affected)
err = stmt.Close()
if err != nil {
panic(err)
}
}
{
stmt, err := connector.Init()
if err != nil {
panic(err)
}
err = stmt.Prepare("insert into ? using all_all tags(?,?,?,?,?,?,?,?,?,?,?,?,?,?) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
err = stmt.SetTableName("tb1")
if err != nil {
panic(err)
}
Writes OpenTSDB JSON protocol data.
err = stmt.SetTableName("tb2")
if err != nil {
panic(err)
}
err = stmt.SetTags(
param.NewParam(14).
AddTimestamp(now, 0).
AddBool(true).
AddTinyint(2).
AddSmallint(2).
AddInt(2).
AddBigint(2).
AddUTinyint(2).
AddUSmallint(2).
AddUInt(2).
AddUBigint(2).
AddFloat(2).
AddDouble(2).
AddBinary([]byte("tb2")).
AddNchar("tb2"),
param.NewColumnType(14).
AddTimestamp().
AddBool().
AddTinyint().
AddSmallint().
AddInt().
AddBigint().
AddUTinyint().
AddUSmallint().
AddUInt().
AddUBigint().
AddFloat().
AddDouble().
AddBinary(0).
AddNchar(0),
)
if err != nil {
panic(err)
}
params := []*param.Param{
param.NewParam(3).AddTimestamp(now, 0).AddTimestamp(now.Add(time.Second), 0).AddTimestamp(now.Add(time.Second*2), 0),
param.NewParam(3).AddBool(true).AddNull().AddBool(true),
param.NewParam(3).AddTinyint(1).AddNull().AddTinyint(1),
param.NewParam(3).AddSmallint(1).AddNull().AddSmallint(1),
param.NewParam(3).AddInt(1).AddNull().AddInt(1),
param.NewParam(3).AddBigint(1).AddNull().AddBigint(1),
param.NewParam(3).AddUTinyint(1).AddNull().AddUTinyint(1),
param.NewParam(3).AddUSmallint(1).AddNull().AddUSmallint(1),
param.NewParam(3).AddUInt(1).AddNull().AddUInt(1),
param.NewParam(3).AddUBigint(1).AddNull().AddUBigint(1),
param.NewParam(3).AddFloat(1).AddNull().AddFloat(1),
param.NewParam(3).AddDouble(1).AddNull().AddDouble(1),
param.NewParam(3).AddBinary([]byte("test_binary")).AddNull().AddBinary([]byte("test_binary")),
param.NewParam(3).AddNchar("test_nchar").AddNull().AddNchar("test_nchar"),
}
paramTypes := param.NewColumnType(14).
AddTimestamp().
AddBool().
AddTinyint().
AddSmallint().
AddInt().
AddBigint().
AddUTinyint().
AddUSmallint().
AddUInt().
AddUBigint().
AddFloat().
AddDouble().
AddBinary(0).
AddNchar(0)
err = stmt.BindParam(params, paramTypes)
if err != nil {
panic(err)
}
err = stmt.AddBatch()
if err != nil {
panic(err)
}
err = stmt.Exec()
if err != nil {
panic(err)
}
affected := stmt.GetAffectedRows()
fmt.Println("all_all affected rows:", affected)
err = stmt.Close()
if err != nil {
panic(err)
}
#### parameter binding
}
}
* `func (conn *Connector) StmtExecute(sql string, params *param.Param) (res driver.Result, err error)`
func prepareEnv(db *sql.DB) {
steps := []string{
"create database example_ws_stmt",
"create table example_ws_stmt.all_json(ts timestamp," +
"c1 bool," +
"c2 tinyint," +
"c3 smallint," +
"c4 int," +
"c5 bigint," +
"c6 tinyint unsigned," +
"c7 smallint unsigned," +
"c8 int unsigned," +
"c9 bigint unsigned," +
"c10 float," +
"c11 double," +
"c12 binary(20)," +
"c13 nchar(20)" +
")" +
"tags(t json)",
"create table example_ws_stmt.all_all(" +
"ts timestamp," +
"c1 bool," +
"c2 tinyint," +
"c3 smallint," +
"c4 int," +
"c5 bigint," +
"c6 tinyint unsigned," +
"c7 smallint unsigned," +
"c8 int unsigned," +
"c9 bigint unsigned," +
"c10 float," +
"c11 double," +
"c12 binary(20)," +
"c13 nchar(20)" +
")" +
"tags(" +
"tts timestamp," +
"tc1 bool," +
"tc2 tinyint," +
"tc3 smallint," +
"tc4 int," +
"tc5 bigint," +
"tc6 tinyint unsigned," +
"tc7 smallint unsigned," +
"tc8 int unsigned," +
"tc9 bigint unsigned," +
"tc10 float," +
"tc11 double," +
"tc12 binary(20)," +
"tc13 nchar(20))",
}
for _, step := range steps {
_, err := db.Exec(step)
if err != nil {
panic(err)
}
}
}
Parameter bound single row insert.
```
* `func (conn *Connector) InsertStmt() *insertstmt.InsertStmt`
</TabItem>
</Tabs>
Initialize the parameters.
* `func (stmt *InsertStmt) Prepare(sql string) error`
### Schemaless Writing
Parameter binding preprocessing SQL statement.
<Tabs defaultValue="native" groupId="connect">
<TabItem value="native" label="native connection">
* `func (stmt *InsertStmt) SetTableName(name string) error`
```go
import (
"fmt"
Bind the table name parameter.
"github.com/taosdata/driver-go/v3/af"
)
* `func (stmt *InsertStmt) SetSubTableName(name string) error`
func main() {
conn, err := af.Open("localhost", "root", "taosdata", "", 6030)
if err != nil {
fmt.Println("fail to connect, err:", err)
}
defer conn.Close()
_, err = conn.Exec("create database if not exists example")
if err != nil {
panic(err)
}
_, err = conn.Exec("use example")
if err != nil {
panic(err)
}
influxdbData := "st,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000"
err = conn.InfluxDBInsertLines([]string{influxdbData}, "ns")
if err != nil {
panic(err)
}
telnetData := "stb0_0 1626006833 4 host=host0 interface=eth0"
err = conn.OpenTSDBInsertTelnetLines([]string{telnetData})
if err != nil {
panic(err)
}
jsonData := "{\"metric\": \"meter_current\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}"
err = conn.OpenTSDBInsertJsonPayload(jsonData)
if err != nil {
panic(err)
}
}
```
Parameter binding to set the sub table name.
</TabItem>
<TabItem value="WebSocket" label="WebSocket connection">
* `func (stmt *InsertStmt) BindParam(params []*param.Param, bindType *param.ColumnType) error`
```go
import (
"database/sql"
"log"
"time"
Parameter bind multiple rows of data.
"github.com/taosdata/driver-go/v3/common"
_ "github.com/taosdata/driver-go/v3/taosWS"
"github.com/taosdata/driver-go/v3/ws/schemaless"
)
* `func (stmt *InsertStmt) AddBatch() error`
func main() {
db, err := sql.Open("taosWS", "root:taosdata@ws(localhost:6041)/")
if err != nil {
log.Fatal(err)
}
defer db.Close()
_, err = db.Exec("create database if not exists schemaless_ws")
if err != nil {
log.Fatal(err)
}
s, err := schemaless.NewSchemaless(schemaless.NewConfig("ws://localhost:6041/rest/schemaless", 1,
schemaless.SetDb("schemaless_ws"),
schemaless.SetReadTimeout(10*time.Second),
schemaless.SetWriteTimeout(10*time.Second),
schemaless.SetUser("root"),
schemaless.SetPassword("taosdata"),
schemaless.SetErrorHandler(func(err error) {
log.Fatal(err)
}),
))
if err != nil {
panic(err)
}
influxdbData := "st,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000"
telnetData := "stb0_0 1626006833 4 host=host0 interface=eth0"
jsonData := "{\"metric\": \"meter_current\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}"
Add to a parameter-bound batch.
err = s.Insert(influxdbData, schemaless.InfluxDBLineProtocol, "ns", 0, common.GetReqID())
if err != nil {
panic(err)
}
err = s.Insert(telnetData, schemaless.OpenTSDBTelnetLineProtocol, "ms", 0, common.GetReqID())
if err != nil {
panic(err)
}
err = s.Insert(jsonData, schemaless.OpenTSDBJsonFormatProtocol, "ms", 0, common.GetReqID())
if err != nil {
panic(err)
}
}
```
* `func (stmt *InsertStmt) Execute() error`
</TabItem>
</Tabs>
Execute a parameter binding.
* `func (stmt *InsertStmt) GetAffectedRows() int`
### Schemaless with reqId
Gets the number of affected rows inserted by the parameter binding.
```go
func (s *Schemaless) Insert(lines string, protocol int, precision string, ttl int, reqID int64) error
```
* `func (stmt *InsertStmt) Close() error`
You can get the unique id by `common.GetReqID()`.
Closes the parameter binding.
### Data Subscription
### Subscribe via WebSocket
The TDengine Go Connector supports subscription functionality with the following application API.
* `func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)`
#### Create a Topic
Creates consumer group.
```go
db, err := af.Open("", "root", "taosdata", "", 0)
if err != nil {
panic(err)
}
defer db.Close()
_, err = db.Exec("create database if not exists example_tmq WAL_RETENTION_PERIOD 86400")
if err != nil {
panic(err)
}
_, err = db.Exec("create topic if not exists example_tmq_topic as DATABASE example_tmq")
if err != nil {
panic(err)
}
```
* `func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error`
Note: `rebalanceCb` is reserved for compatibility purpose
#### Create a Consumer
Subscribes a topic.
```go
consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
"group.id": "test",
"auto.offset.reset": "earliest",
"td.connect.ip": "127.0.0.1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"td.connect.port": "6030",
"client.id": "test_tmq_client",
"enable.auto.commit": "false",
"msg.with.table.name": "true",
})
if err != nil {
panic(err)
}
```
* `func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error`
Note: `rebalanceCb` is reserved for compatibility purpose
#### Subscribe to consume data
Subscribes to topics.
```go
err = consumer.Subscribe("example_tmq_topic", nil)
if err != nil {
panic(err)
}
for i := 0; i < 5; i++ {
ev := consumer.Poll(500)
if ev != nil {
switch e := ev.(type) {
case *tmqcommon.DataMessage:
fmt.Printf("get message:%v\n", e)
case tmqcommon.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
panic(e)
}
consumer.Commit()
}
}
```
* `func (c *Consumer) Poll(timeoutMs int) tmq.Event`
#### Assignment subscription Offset
Polling information.
```go
partitions, err := consumer.Assignment()
if err != nil {
panic(err)
}
for i := 0; i < len(partitions); i++ {
fmt.Println(partitions[i])
err = consumer.Seek(tmqcommon.TopicPartition{
Topic: partitions[i].Topic,
Partition: partitions[i].Partition,
Offset: 0,
}, 0)
if err != nil {
panic(err)
}
}
```
* `func (c *Consumer) Commit() ([]tmq.TopicPartition, error)`
Note: `tmq.TopicPartition` is reserved for compatibility purpose
#### Close subscriptions
Commit information.
```go
err = consumer.Close()
if err != nil {
panic(err)
}
```
* `func (c *Consumer) Assignment() (partitions []tmq.TopicPartition, err error)`
#### Full Sample Code
Get Assignment(TDengine >= 3.0.5.0 and driver-go >= v3.5.0 are required).
<Tabs defaultValue="native" groupId="connect">
<TabItem value="native" label="native connection">
* `func (c *Consumer) Seek(partition tmq.TopicPartition, ignoredTimeoutMs int) error`
Note: `ignoredTimeoutMs` is reserved for compatibility purpose
```go
package main
Seek offset(TDengine >= 3.0.5.0 and driver-go >= v3.5.0 are required).
import (
"fmt"
"os"
* `func (c *Consumer) Unsubscribe() error`
"github.com/taosdata/driver-go/v3/af"
"github.com/taosdata/driver-go/v3/af/tmq"
tmqcommon "github.com/taosdata/driver-go/v3/common/tmq"
)
Unsubscribe.
func main() {
db, err := af.Open("", "root", "taosdata", "", 0)
if err != nil {
panic(err)
}
defer db.Close()
_, err = db.Exec("create database if not exists example_tmq WAL_RETENTION_PERIOD 86400")
if err != nil {
panic(err)
}
_, err = db.Exec("create topic if not exists example_tmq_topic as DATABASE example_tmq")
if err != nil {
panic(err)
}
if err != nil {
panic(err)
}
consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
"group.id": "test",
"auto.offset.reset": "earliest",
"td.connect.ip": "127.0.0.1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"td.connect.port": "6030",
"client.id": "test_tmq_client",
"enable.auto.commit": "false",
"msg.with.table.name": "true",
})
if err != nil {
panic(err)
}
err = consumer.Subscribe("example_tmq_topic", nil)
if err != nil {
panic(err)
}
_, err = db.Exec("create table example_tmq.t1 (ts timestamp,v int)")
if err != nil {
panic(err)
}
_, err = db.Exec("insert into example_tmq.t1 values(now,1)")
if err != nil {
panic(err)
}
for i := 0; i < 5; i++ {
ev := consumer.Poll(500)
if ev != nil {
switch e := ev.(type) {
case *tmqcommon.DataMessage:
fmt.Printf("get message:%v\n", e)
case tmqcommon.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
panic(e)
}
consumer.Commit()
}
}
partitions, err := consumer.Assignment()
if err != nil {
panic(err)
}
for i := 0; i < len(partitions); i++ {
fmt.Println(partitions[i])
err = consumer.Seek(tmqcommon.TopicPartition{
Topic: partitions[i].Topic,
Partition: partitions[i].Partition,
Offset: 0,
}, 0)
if err != nil {
panic(err)
}
}
* `func (c *Consumer) Close() error`
partitions, err = consumer.Assignment()
if err != nil {
panic(err)
}
for i := 0; i < len(partitions); i++ {
fmt.Println(partitions[i])
}
Close consumer.
err = consumer.Close()
if err != nil {
panic(err)
}
}
```
For a complete example see [GitHub sample file](https://github.com/taosdata/driver-go/blob/main/examples/tmqoverws/main.go)
</TabItem>
<TabItem value="WebSocket" label="WebSocket connection">
### parameter binding via WebSocket
```go
package main
* `func NewConnector(config *Config) (*Connector, error)`
import (
"database/sql"
"fmt"
Create a connection.
"github.com/taosdata/driver-go/v3/common"
tmqcommon "github.com/taosdata/driver-go/v3/common/tmq"
_ "github.com/taosdata/driver-go/v3/taosRestful"
"github.com/taosdata/driver-go/v3/ws/tmq"
)
* `func (c *Connector) Init() (*Stmt, error)`
func main() {
db, err := sql.Open("taosRestful", "root:taosdata@http(localhost:6041)/")
if err != nil {
panic(err)
}
defer db.Close()
prepareEnv(db)
consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
"ws.url": "ws://127.0.0.1:6041/rest/tmq",
"ws.message.channelLen": uint(0),
"ws.message.timeout": common.DefaultMessageTimeout,
"ws.message.writeWait": common.DefaultWriteWait,
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"group.id": "example",
"client.id": "example_consumer",
"auto.offset.reset": "earliest",
})
if err != nil {
panic(err)
}
err = consumer.Subscribe("example_ws_tmq_topic", nil)
if err != nil {
panic(err)
}
go func() {
_, err := db.Exec("create table example_ws_tmq.t_all(ts timestamp," +
"c1 bool," +
"c2 tinyint," +
"c3 smallint," +
"c4 int," +
"c5 bigint," +
"c6 tinyint unsigned," +
"c7 smallint unsigned," +
"c8 int unsigned," +
"c9 bigint unsigned," +
"c10 float," +
"c11 double," +
"c12 binary(20)," +
"c13 nchar(20)" +
")")
if err != nil {
panic(err)
}
_, err = db.Exec("insert into example_ws_tmq.t_all values(now,true,2,3,4,5,6,7,8,9,10.123,11.123,'binary','nchar')")
if err != nil {
panic(err)
}
}()
for i := 0; i < 5; i++ {
ev := consumer.Poll(500)
if ev != nil {
switch e := ev.(type) {
case *tmqcommon.DataMessage:
fmt.Printf("get message:%v\n", e)
case tmqcommon.Error:
fmt.Printf("%% Error: %v: %v\n", e.Code(), e)
panic(e)
}
consumer.Commit()
}
}
partitions, err := consumer.Assignment()
if err != nil {
panic(err)
}
for i := 0; i < len(partitions); i++ {
fmt.Println(partitions[i])
err = consumer.Seek(tmqcommon.TopicPartition{
Topic: partitions[i].Topic,
Partition: partitions[i].Partition,
Offset: 0,
}, 0)
if err != nil {
panic(err)
}
}
Initialize the parameters.
partitions, err = consumer.Assignment()
if err != nil {
panic(err)
}
for i := 0; i < len(partitions); i++ {
fmt.Println(partitions[i])
}
* `func (c *Connector) Close() error`
err = consumer.Close()
if err != nil {
panic(err)
}
}
Close the connection.
func prepareEnv(db *sql.DB) {
_, err := db.Exec("create database example_ws_tmq WAL_RETENTION_PERIOD 86400")
if err != nil {
panic(err)
}
_, err = db.Exec("create topic example_ws_tmq_topic as database example_ws_tmq")
if err != nil {
panic(err)
}
}
```
* `func (s *Stmt) Prepare(sql string) error`
</TabItem>
</Tabs>
Parameter binding preprocessing SQL statement.
### More sample programs
* `func (s *Stmt) SetTableName(name string) error`
* [sample program](https://github.com/taosdata/driver-go/tree/3.0/examples)
Bind the table name parameter.
* `func (s *Stmt) SetTags(tags *param.Param, bindType *param.ColumnType) error`
## Frequently Asked Questions
Set tags.
1. bind interface in database/sql crashes
* `func (s *Stmt) BindParam(params []*param.Param, bindType *param.ColumnType) error`
REST does not support parameter binding related interface. It is recommended to use `db.Exec` and `db.Query`.
Parameter bind multiple rows of data.
2. error `[0x217] Database not specified or available` after executing other statements with `use db` statement
* `func (s *Stmt) AddBatch() error`
The execution of SQL command in the REST interface is not contextual, so using `use db` statement will not work, see the usage restrictions section above.
Add to a parameter-bound batch.
3. use `taosSql` without error but use `taosRestful` with error `[0x217] Database not specified or available`
* `func (s *Stmt) Exec() error`
Because the REST interface is stateless, using the `use db` statement will not take effect. See the usage restrictions section above.
Execute a parameter binding.
4. `readBufferSize` parameter has no significant effect after being increased
* `func (s *Stmt) GetAffectedRows() int`
Increasing `readBufferSize` will reduce the number of `syscall` calls when fetching results. If the query result is smaller, modifying this parameter will not improve performance significantly. If you increase the parameter value too much, the bottleneck will be parsing JSON data. If you need to optimize the query speed, you must adjust the value based on the actual situation to achieve the best query performance.
Gets the number of affected rows inserted by the parameter binding.
5. `disableCompression` parameter is set to `false` when the query efficiency is reduced
* `func (s *Stmt) Close() error`
When set `disableCompression` parameter to `false`, the query result will be compressed by `gzip` and then transmitted, so you have to decompress the data by `gzip` after getting it.
Closes the parameter binding.
6. `go get` command can't get the package, or timeout to get the package
For a complete example see [GitHub sample file](https://github.com/taosdata/driver-go/blob/main/examples/stmtoverws/main.go)
Set Go proxy `go env -w GOPROXY=https://goproxy.cn,direct`.
## API Reference
......
......@@ -32,24 +32,44 @@ REST 连接支持所有能运行 Go 的平台。
请参考[版本支持列表](https://github.com/taosdata/driver-go#remind)
## 支持的功能特性
## 处理异常
### 原生连接
如果是 TDengine 错误可以通过以下方式获取错误码和错误信息。
“原生连接”指连接器通过 TDengine 客户端驱动(taosc)直接与 TDengine 运行实例建立的连接。支持的功能特性有:
* 普通查询
* 连续查询
* 订阅
* schemaless 接口
* 参数绑定接口
### REST 连接
"REST 连接"指连接器通过 taosAdapter 组件提供的 REST API TDengine 运行实例建立的连接。支持的功能特性有:
```go
// import "github.com/taosdata/driver-go/v3/errors"
if err != nil {
tError, is := err.(*errors.TaosError)
if is {
fmt.Println("errorCode:", int(tError.Code))
fmt.Println("errorMessage:", tError.ErrStr)
} else {
fmt.Println(err.Error())
}
}
```
* 普通查询
* 连续查询
## TDengine DataType Go DataType
| TDengine DataType | Go Type |
|-------------------|-----------|
| TIMESTAMP | time.Time |
| TINYINT | int8 |
| SMALLINT | int16 |
| INT | int32 |
| BIGINT | int64 |
| TINYINT UNSIGNED | uint8 |
| SMALLINT UNSIGNED | uint16 |
| INT UNSIGNED | uint32 |
| BIGINT UNSIGNED | uint64 |
| FLOAT | float32 |
| DOUBLE | float64 |
| BOOL | bool |
| BINARY | string |
| NCHAR | string |
| JSON | []byte |
**注意**JSON 类型仅在 tag 中支持。
## 安装步骤
......@@ -63,11 +83,7 @@ REST 连接支持所有能运行 Go 的平台。
* ```go env```
* ```gcc -v```
### 使用 go get 安装
`go get -u github.com/taosdata/driver-go/v3@latest`
### 使用 go mod 管理
### 安装连接器
1. 使用 `go mod` 命令初始化项目:
......@@ -99,8 +115,6 @@ REST 连接支持所有能运行 Go 的平台。
## 建立连接
### 数据源名称(DSN
数据源名称具有通用格式,例如 [PEAR DB](http://pear.php.net/manual/en/package.database.db.intro-dsn.php),但没有类型前缀(方括号表示可选):
``` text
......@@ -113,9 +127,7 @@ REST 连接支持所有能运行 Go 的平台。
username:password@protocol(address)/dbname?param=value
```
### 使用连接器进行连接
<Tabs defaultValue="rest">
<Tabs defaultValue="rest" groupId="connect">
<TabItem value="native" label="原生连接">
_taosSql_ 通过 cgo 实现了 Go `database/sql/driver` 接口。只需要引入驱动就可以使用 [`database/sql`](https://golang.org/pkg/database/sql/) 的接口。
......@@ -213,332 +225,900 @@ func main() {
</TabItem>
</Tabs>
## 使用示例
### 写入数据
### 指定 URL Properties 获取连接
#### SQL 写入
Go 连接器不支持此功能
<GoInsert />
### 配置参数的优先级
#### InfluxDB 行协议写入
Go 连接器不支持此功能
<GoInfluxLine />
## 使用示例
#### OpenTSDB Telnet 行协议写入
### 创建数据库和表
<GoOpenTSDBTelnet />
```go
var taosDSN = "root:taosdata@tcp(localhost:6030)/"
taos, err := sql.Open("taosSql", taosDSN)
if err != nil {
log.Fatalln("failed to connect TDengine, err:", err)
}
defer taos.Close()
_, err := taos.Exec("CREATE DATABASE power")
if err != nil {
log.Fatalln("failed to create database, err:", err)
}
_, err = taos.Exec("CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)")
if err != nil {
log.Fatalln("failed to create stable, err:", err)
}
```
#### OpenTSDB JSON 行协议写入
### 插入数据
<GoOpenTSDBJson />
<GoInsert />
### 查询数据
<GoQuery />
### 更多示例程序
* [示例程序](https://github.com/taosdata/driver-go/tree/3.0/examples)
* [视频教程](https://www.taosdata.com/blog/2020/11/11/1951.html)
### 执行带有 reqId SQL
## 使用限制
reqId 可用于请求链路追踪。
由于 REST 接口无状态所以 `use db` 语法不会生效,需要将 db 名称放到 SQL 语句中,如:`create table if not exists tb1 (ts timestamp, a int)`改为`create table if not exists test.tb1 (ts timestamp, a int)`否则将报错`[0x217] Database not specified or available`
```go
db, err := sql.Open("taosSql", "root:taosdata@tcp(localhost:6030)/")
if err != nil {
panic(err)
}
defer db.Close()
ctx := context.WithValue(context.Background(), common.ReqIDKey, common.GetReqID())
_, err = db.ExecContext(ctx, "create database if not exists example_taos_sql")
if err != nil {
panic(err)
}
```
也可以将 db 名称放到 DSN 中,将 `root:taosdata@http(localhost:6041)/` 改为 `root:taosdata@http(localhost:6041)/test`。当指定的 db 不存在时执行 `create database` 语句不会报错,而执行针对该 db 的其他查询或写入操作会报错。
### 通过参数绑定写入数据
完整示例如下:
<Tabs defaultValue="native" groupId="connect">
<TabItem value="native" label="原生连接">
```go
package main
import (
"database/sql"
"fmt"
"time"
_ "github.com/taosdata/driver-go/v3/taosRestful"
"github.com/taosdata/driver-go/v3/af"
"github.com/taosdata/driver-go/v3/common"
"github.com/taosdata/driver-go/v3/common/param"
)
func main() {
var taosDSN = "root:taosdata@http(localhost:6041)/test"
taos, err := sql.Open("taosRestful", taosDSN)
db, err := af.Open("", "root", "taosdata", "", 0)
if err != nil {
fmt.Println("failed to connect TDengine, err:", err)
return
panic(err)
}
defer taos.Close()
taos.Exec("create database if not exists test")
taos.Exec("create table if not exists tb1 (ts timestamp, a int)")
_, err = taos.Exec("insert into tb1 values(now, 0)(now+1s,1)(now+2s,2)(now+3s,3)")
defer db.Close()
_, err = db.Exec("create database if not exists example_stmt")
if err != nil {
fmt.Println("failed to insert, err:", err)
return
panic(err)
}
rows, err := taos.Query("select * from tb1")
_, err = db.Exec("create table if not exists example_stmt.tb1(ts timestamp," +
"c1 bool," +
"c2 tinyint," +
"c3 smallint," +
"c4 int," +
"c5 bigint," +
"c6 tinyint unsigned," +
"c7 smallint unsigned," +
"c8 int unsigned," +
"c9 bigint unsigned," +
"c10 float," +
"c11 double," +
"c12 binary(20)," +
"c13 nchar(20)" +
")")
if err != nil {
fmt.Println("failed to select from table, err:", err)
return
panic(err)
}
defer rows.Close()
for rows.Next() {
var r struct {
ts time.Time
a int
stmt := db.InsertStmt()
err = stmt.Prepare("insert into example_stmt.tb1 values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
if err != nil {
panic(err)
}
err := rows.Scan(&r.ts, &r.a)
now := time.Now()
params := make([]*param.Param, 14)
params[0] = param.NewParam(2).
AddTimestamp(now, common.PrecisionMilliSecond).
AddTimestamp(now.Add(time.Second), common.PrecisionMilliSecond)
params[1] = param.NewParam(2).AddBool(true).AddNull()
params[2] = param.NewParam(2).AddTinyint(2).AddNull()
params[3] = param.NewParam(2).AddSmallint(3).AddNull()
params[4] = param.NewParam(2).AddInt(4).AddNull()
params[5] = param.NewParam(2).AddBigint(5).AddNull()
params[6] = param.NewParam(2).AddUTinyint(6).AddNull()
params[7] = param.NewParam(2).AddUSmallint(7).AddNull()
params[8] = param.NewParam(2).AddUInt(8).AddNull()
params[9] = param.NewParam(2).AddUBigint(9).AddNull()
params[10] = param.NewParam(2).AddFloat(10).AddNull()
params[11] = param.NewParam(2).AddDouble(11).AddNull()
params[12] = param.NewParam(2).AddBinary([]byte("binary")).AddNull()
params[13] = param.NewParam(2).AddNchar("nchar").AddNull()
paramTypes := param.NewColumnType(14).
AddTimestamp().
AddBool().
AddTinyint().
AddSmallint().
AddInt().
AddBigint().
AddUTinyint().
AddUSmallint().
AddUInt().
AddUBigint().
AddFloat().
AddDouble().
AddBinary(6).
AddNchar(5)
err = stmt.BindParam(params, paramTypes)
if err != nil {
fmt.Println("scan error:\n", err)
return
panic(err)
}
err = stmt.AddBatch()
if err != nil {
panic(err)
}
fmt.Println(r.ts, r.a)
err = stmt.Execute()
if err != nil {
panic(err)
}
err = stmt.Close()
if err != nil {
panic(err)
}
// select * from example_stmt.tb1
}
```
## 常见问题
1. database/sql stmt(参数绑定)相关接口崩溃
REST 不支持参数绑定相关接口,建议使用`db.Exec``db.Query`
2. 使用 `use db` 语句后执行其他语句报错 `[0x217] Database not specified or available`
REST 接口中 SQL 语句的执行无上下文关联,使用 `use db` 语句不会生效,解决办法见上方使用限制章节。
3. 使用 taosSql 不报错使用 taosRestful 报错 `[0x217] Database not specified or available`
因为 REST 接口无状态,使用 `use db` 语句不会生效,解决办法见上方使用限制章节。
4. `readBufferSize` 参数调大后无明显效果
`readBufferSize` 调大后会减少获取结果时 `syscall` 的调用。如果查询结果的数据量不大,修改该参数不会带来明显提升,如果该参数修改过大,瓶颈会在解析 JSON 数据。如果需要优化查询速度,需要根据实际情况调整该值来达到查询效果最优。
5. `disableCompression` 参数设置为 `false` 时查询效率降低
`disableCompression` 参数设置为 `false` 时查询结果会使用 `gzip` 压缩后传输,拿到数据后要先进行 `gzip` 解压。
6. `go get` 命令无法获取包,或者获取包超时
设置 Go 代理 `go env -w GOPROXY=https://goproxy.cn,direct`
## 常用 API
### database/sql API
* `sql.Open(DRIVER_NAME string, dataSourceName string) *DB`
API 用来打开 DB,返回一个类型为 \*DB 的对象。
:::info
API 成功创建的时候,并没有做权限等检查,只有在真正执行 Query 或者 Exec 的时候才能真正的去创建连接,并同时检查 user/password/host/port 是不是合法。
:::
* `func (db *DB) Exec(query string, args ...interface{}) (Result, error)`
`sql.Open` 内置的方法,用来执行非查询相关 SQL
* `func (db *DB) Query(query string, args ...interface{}) (*Rows, error)`
`sql.Open` 内置的方法,用来执行查询语句。
### 高级功能(afAPI
`af` 包封装了连接管理、订阅、schemaless、参数绑定等 TDengine 高级功能。
#### 连接管理
* `af.Open(host, user, pass, db string, port int) (*Connector, error)`
API 通过 cgo 创建与 taosd 的连接。
* `func (conn *Connector) Close() error`
关闭与 taosd 的连接。
#### 订阅
* `func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)`
创建消费者。
* `func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error`
注意:出于兼容目的保留 `rebalanceCb` 参数,当前未使用
订阅单个主题。
* `func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error`
注意:出于兼容目的保留 `rebalanceCb` 参数,当前未使用
订阅主题。
* `func (c *Consumer) Poll(timeoutMs int) tmq.Event`
轮询消息。
* `func (c *Consumer) Commit() ([]tmq.TopicPartition, error)`
注意:出于兼容目的保留 `tmq.TopicPartition` 参数,当前未使用
提交消息。
* `func (c *Consumer) Assignment() (partitions []tmq.TopicPartition, err error)`
获取消费进度。(需要 TDengine >= 3.0.5.0 driver-go >= v3.5.0)
* `func (c *Consumer) Seek(partition tmq.TopicPartition, ignoredTimeoutMs int) error`
注意:出于兼容目的保留 `ignoredTimeoutMs` 参数,当前未使用
按照指定的进度消费。(需要 TDengine >= 3.0.5.0 driver-go >= v3.5.0)
* `func (c *Consumer) Close() error`
关闭连接。
#### schemaless
* `func (conn *Connector) InfluxDBInsertLines(lines []string, precision string) error`
写入 InfluxDB 行协议。
</TabItem>
<TabItem value="WebSocket" label="WebSocket 连接">
* `func (conn *Connector) OpenTSDBInsertTelnetLines(lines []string) error`
```go
package main
写入 OpenTDSB telnet 协议数据。
import (
"database/sql"
"fmt"
"time"
* `func (conn *Connector) OpenTSDBInsertJsonPayload(payload string) error`
"github.com/taosdata/driver-go/v3/common"
"github.com/taosdata/driver-go/v3/common/param"
_ "github.com/taosdata/driver-go/v3/taosRestful"
"github.com/taosdata/driver-go/v3/ws/stmt"
)
写入 OpenTSDB JSON 协议数据。
func main() {
db, err := sql.Open("taosRestful", "root:taosdata@http(localhost:6041)/")
if err != nil {
panic(err)
}
defer db.Close()
prepareEnv(db)
config := stmt.NewConfig("ws://127.0.0.1:6041/rest/stmt", 0)
config.SetConnectUser("root")
config.SetConnectPass("taosdata")
config.SetConnectDB("example_ws_stmt")
config.SetMessageTimeout(common.DefaultMessageTimeout)
config.SetWriteWait(common.DefaultWriteWait)
config.SetErrorHandler(func(connector *stmt.Connector, err error) {
panic(err)
})
config.SetCloseHandler(func() {
fmt.Println("stmt connector closed")
})
connector, err := stmt.NewConnector(config)
if err != nil {
panic(err)
}
now := time.Now()
{
stmt, err := connector.Init()
if err != nil {
panic(err)
}
err = stmt.Prepare("insert into ? using all_json tags(?) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
if err != nil {
panic(err)
}
err = stmt.SetTableName("tb1")
if err != nil {
panic(err)
}
err = stmt.SetTags(param.NewParam(1).AddJson([]byte(`{"tb":1}`)), param.NewColumnType(1).AddJson(0))
if err != nil {
panic(err)
}
params := []*param.Param{
param.NewParam(3).AddTimestamp(now, 0).AddTimestamp(now.Add(time.Second), 0).AddTimestamp(now.Add(time.Second*2), 0),
param.NewParam(3).AddBool(true).AddNull().AddBool(true),
param.NewParam(3).AddTinyint(1).AddNull().AddTinyint(1),
param.NewParam(3).AddSmallint(1).AddNull().AddSmallint(1),
param.NewParam(3).AddInt(1).AddNull().AddInt(1),
param.NewParam(3).AddBigint(1).AddNull().AddBigint(1),
param.NewParam(3).AddUTinyint(1).AddNull().AddUTinyint(1),
param.NewParam(3).AddUSmallint(1).AddNull().AddUSmallint(1),
param.NewParam(3).AddUInt(1).AddNull().AddUInt(1),
param.NewParam(3).AddUBigint(1).AddNull().AddUBigint(1),
param.NewParam(3).AddFloat(1).AddNull().AddFloat(1),
param.NewParam(3).AddDouble(1).AddNull().AddDouble(1),
param.NewParam(3).AddBinary([]byte("test_binary")).AddNull().AddBinary([]byte("test_binary")),
param.NewParam(3).AddNchar("test_nchar").AddNull().AddNchar("test_nchar"),
}
paramTypes := param.NewColumnType(14).
AddTimestamp().
AddBool().
AddTinyint().
AddSmallint().
AddInt().
AddBigint().
AddUTinyint().
AddUSmallint().
AddUInt().
AddUBigint().
AddFloat().
AddDouble().
AddBinary(0).
AddNchar(0)
err = stmt.BindParam(params, paramTypes)
if err != nil {
panic(err)
}
err = stmt.AddBatch()
if err != nil {
panic(err)
}
err = stmt.Exec()
if err != nil {
panic(err)
}
affected := stmt.GetAffectedRows()
fmt.Println("all_json affected rows:", affected)
err = stmt.Close()
if err != nil {
panic(err)
}
}
{
stmt, err := connector.Init()
if err != nil {
panic(err)
}
err = stmt.Prepare("insert into ? using all_all tags(?,?,?,?,?,?,?,?,?,?,?,?,?,?) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
err = stmt.SetTableName("tb1")
if err != nil {
panic(err)
}
#### 参数绑定
err = stmt.SetTableName("tb2")
if err != nil {
panic(err)
}
err = stmt.SetTags(
param.NewParam(14).
AddTimestamp(now, 0).
AddBool(true).
AddTinyint(2).
AddSmallint(2).
AddInt(2).
AddBigint(2).
AddUTinyint(2).
AddUSmallint(2).
AddUInt(2).
AddUBigint(2).
AddFloat(2).
AddDouble(2).
AddBinary([]byte("tb2")).
AddNchar("tb2"),
param.NewColumnType(14).
AddTimestamp().
AddBool().
AddTinyint().
AddSmallint().
AddInt().
AddBigint().
AddUTinyint().
AddUSmallint().
AddUInt().
AddUBigint().
AddFloat().
AddDouble().
AddBinary(0).
AddNchar(0),
)
if err != nil {
panic(err)
}
params := []*param.Param{
param.NewParam(3).AddTimestamp(now, 0).AddTimestamp(now.Add(time.Second), 0).AddTimestamp(now.Add(time.Second*2), 0),
param.NewParam(3).AddBool(true).AddNull().AddBool(true),
param.NewParam(3).AddTinyint(1).AddNull().AddTinyint(1),
param.NewParam(3).AddSmallint(1).AddNull().AddSmallint(1),
param.NewParam(3).AddInt(1).AddNull().AddInt(1),
param.NewParam(3).AddBigint(1).AddNull().AddBigint(1),
param.NewParam(3).AddUTinyint(1).AddNull().AddUTinyint(1),
param.NewParam(3).AddUSmallint(1).AddNull().AddUSmallint(1),
param.NewParam(3).AddUInt(1).AddNull().AddUInt(1),
param.NewParam(3).AddUBigint(1).AddNull().AddUBigint(1),
param.NewParam(3).AddFloat(1).AddNull().AddFloat(1),
param.NewParam(3).AddDouble(1).AddNull().AddDouble(1),
param.NewParam(3).AddBinary([]byte("test_binary")).AddNull().AddBinary([]byte("test_binary")),
param.NewParam(3).AddNchar("test_nchar").AddNull().AddNchar("test_nchar"),
}
paramTypes := param.NewColumnType(14).
AddTimestamp().
AddBool().
AddTinyint().
AddSmallint().
AddInt().
AddBigint().
AddUTinyint().
AddUSmallint().
AddUInt().
AddUBigint().
AddFloat().
AddDouble().
AddBinary(0).
AddNchar(0)
err = stmt.BindParam(params, paramTypes)
if err != nil {
panic(err)
}
err = stmt.AddBatch()
if err != nil {
panic(err)
}
err = stmt.Exec()
if err != nil {
panic(err)
}
affected := stmt.GetAffectedRows()
fmt.Println("all_all affected rows:", affected)
err = stmt.Close()
if err != nil {
panic(err)
}
* `func (conn *Connector) StmtExecute(sql string, params *param.Param) (res driver.Result, err error)`
}
}
参数绑定单行插入。
func prepareEnv(db *sql.DB) {
steps := []string{
"create database example_ws_stmt",
"create table example_ws_stmt.all_json(ts timestamp," +
"c1 bool," +
"c2 tinyint," +
"c3 smallint," +
"c4 int," +
"c5 bigint," +
"c6 tinyint unsigned," +
"c7 smallint unsigned," +
"c8 int unsigned," +
"c9 bigint unsigned," +
"c10 float," +
"c11 double," +
"c12 binary(20)," +
"c13 nchar(20)" +
")" +
"tags(t json)",
"create table example_ws_stmt.all_all(" +
"ts timestamp," +
"c1 bool," +
"c2 tinyint," +
"c3 smallint," +
"c4 int," +
"c5 bigint," +
"c6 tinyint unsigned," +
"c7 smallint unsigned," +
"c8 int unsigned," +
"c9 bigint unsigned," +
"c10 float," +
"c11 double," +
"c12 binary(20)," +
"c13 nchar(20)" +
")" +
"tags(" +
"tts timestamp," +
"tc1 bool," +
"tc2 tinyint," +
"tc3 smallint," +
"tc4 int," +
"tc5 bigint," +
"tc6 tinyint unsigned," +
"tc7 smallint unsigned," +
"tc8 int unsigned," +
"tc9 bigint unsigned," +
"tc10 float," +
"tc11 double," +
"tc12 binary(20)," +
"tc13 nchar(20))",
}
for _, step := range steps {
_, err := db.Exec(step)
if err != nil {
panic(err)
}
}
}
* `func (conn *Connector) InsertStmt() *insertstmt.InsertStmt`
```
初始化参数。
</TabItem>
</Tabs>
* `func (stmt *InsertStmt) Prepare(sql string) error`
### 无模式写入
参数绑定预处理 SQL 语句。
<Tabs defaultValue="native" groupId="connect">
<TabItem value="native" label="原生连接">
* `func (stmt *InsertStmt) SetTableName(name string) error`
```go
import (
"fmt"
参数绑定设置表名。
"github.com/taosdata/driver-go/v3/af"
)
* `func (stmt *InsertStmt) SetSubTableName(name string) error`
func main() {
conn, err := af.Open("localhost", "root", "taosdata", "", 6030)
if err != nil {
fmt.Println("fail to connect, err:", err)
}
defer conn.Close()
_, err = conn.Exec("create database if not exists example")
if err != nil {
panic(err)
}
_, err = conn.Exec("use example")
if err != nil {
panic(err)
}
influxdbData := "st,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000"
err = conn.InfluxDBInsertLines([]string{influxdbData}, "ns")
if err != nil {
panic(err)
}
telnetData := "stb0_0 1626006833 4 host=host0 interface=eth0"
err = conn.OpenTSDBInsertTelnetLines([]string{telnetData})
if err != nil {
panic(err)
}
jsonData := "{\"metric\": \"meter_current\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}"
err = conn.OpenTSDBInsertJsonPayload(jsonData)
if err != nil {
panic(err)
}
}
```
参数绑定设置子表名。
</TabItem>
<TabItem value="WebSocket" label="WebSocket 连接">
* `func (stmt *InsertStmt) BindParam(params []*param.Param, bindType *param.ColumnType) error`
```go
import (
"database/sql"
"log"
"time"
参数绑定多行数据。
"github.com/taosdata/driver-go/v3/common"
_ "github.com/taosdata/driver-go/v3/taosWS"
"github.com/taosdata/driver-go/v3/ws/schemaless"
)
* `func (stmt *InsertStmt) AddBatch() error`
func main() {
db, err := sql.Open("taosWS", "root:taosdata@ws(localhost:6041)/")
if err != nil {
log.Fatal(err)
}
defer db.Close()
_, err = db.Exec("create database if not exists schemaless_ws")
if err != nil {
log.Fatal(err)
}
s, err := schemaless.NewSchemaless(schemaless.NewConfig("ws://localhost:6041/rest/schemaless", 1,
schemaless.SetDb("schemaless_ws"),
schemaless.SetReadTimeout(10*time.Second),
schemaless.SetWriteTimeout(10*time.Second),
schemaless.SetUser("root"),
schemaless.SetPassword("taosdata"),
schemaless.SetErrorHandler(func(err error) {
log.Fatal(err)
}),
))
if err != nil {
panic(err)
}
influxdbData := "st,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000"
telnetData := "stb0_0 1626006833 4 host=host0 interface=eth0"
jsonData := "{\"metric\": \"meter_current\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}"
添加到参数绑定批处理。
err = s.Insert(influxdbData, schemaless.InfluxDBLineProtocol, "ns", 0, common.GetReqID())
if err != nil {
panic(err)
}
err = s.Insert(telnetData, schemaless.OpenTSDBTelnetLineProtocol, "ms", 0, common.GetReqID())
if err != nil {
panic(err)
}
err = s.Insert(jsonData, schemaless.OpenTSDBJsonFormatProtocol, "ms", 0, common.GetReqID())
if err != nil {
panic(err)
}
}
```
* `func (stmt *InsertStmt) Execute() error`
</TabItem>
</Tabs>
执行参数绑定。
### 执行带有 reqId 的无模式写入
* `func (stmt *InsertStmt) GetAffectedRows() int`
```go
func (s *Schemaless) Insert(lines string, protocol int, precision string, ttl int, reqID int64) error
```
获取参数绑定插入受影响行数
可以通过 `common.GetReqID()` 获取唯一 id
* `func (stmt *InsertStmt) Close() error`
### 数据订阅
结束参数绑定。
TDengine Go 连接器支持订阅功能,应用 API 如下:
### 通过 WebSocket 订阅
#### 创建 Topic
* `func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)`
```go
db, err := af.Open("", "root", "taosdata", "", 0)
if err != nil {
panic(err)
}
defer db.Close()
_, err = db.Exec("create database if not exists example_tmq WAL_RETENTION_PERIOD 86400")
if err != nil {
panic(err)
}
_, err = db.Exec("create topic if not exists example_tmq_topic as DATABASE example_tmq")
if err != nil {
panic(err)
}
```
创建消费者。
#### 创建 Consumer
* `func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error`
注意:出于兼容目的保留 `rebalanceCb` 参数,当前未使用
```go
consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
"group.id": "test",
"auto.offset.reset": "earliest",
"td.connect.ip": "127.0.0.1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"td.connect.port": "6030",
"client.id": "test_tmq_client",
"enable.auto.commit": "false",
"msg.with.table.name": "true",
})
if err != nil {
panic(err)
}
```
订阅单个主题。
#### 订阅消费数据
* `func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error`
注意:出于兼容目的保留 `rebalanceCb` 参数,当前未使用
```go
err = consumer.Subscribe("example_tmq_topic", nil)
if err != nil {
panic(err)
}
for i := 0; i < 5; i++ {
ev := consumer.Poll(500)
if ev != nil {
switch e := ev.(type) {
case *tmqcommon.DataMessage:
fmt.Printf("get message:%v\n", e)
case tmqcommon.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
panic(e)
}
consumer.Commit()
}
}
```
订阅主题。
#### 指定订阅 Offset
* `func (c *Consumer) Poll(timeoutMs int) tmq.Event`
```go
partitions, err := consumer.Assignment()
if err != nil {
panic(err)
}
for i := 0; i < len(partitions); i++ {
fmt.Println(partitions[i])
err = consumer.Seek(tmqcommon.TopicPartition{
Topic: partitions[i].Topic,
Partition: partitions[i].Partition,
Offset: 0,
}, 0)
if err != nil {
panic(err)
}
}
```
轮询消息。
#### 关闭订阅
* `func (c *Consumer) Commit() ([]tmq.TopicPartition, error)`
注意:出于兼容目的保留 `tmq.TopicPartition` 参数,当前未使用
```go
err = consumer.Close()
if err != nil {
panic(err)
}
```
提交消息。
#### 完整示例
* `func (c *Consumer) Assignment() (partitions []tmq.TopicPartition, err error)`
<Tabs defaultValue="native" groupId="connect">
<TabItem value="native" label="原生连接">
获取消费进度。(需要 TDengine >= 3.0.5.0 driver-go >= v3.5.0)
```go
package main
* `func (c *Consumer) Seek(partition tmq.TopicPartition, ignoredTimeoutMs int) error`
注意:出于兼容目的保留 `ignoredTimeoutMs` 参数,当前未使用
import (
"fmt"
"os"
按照指定的进度消费。(需要 TDengine >= 3.0.5.0 driver-go >= v3.5.0)
"github.com/taosdata/driver-go/v3/af"
"github.com/taosdata/driver-go/v3/af/tmq"
tmqcommon "github.com/taosdata/driver-go/v3/common/tmq"
)
* `func (c *Consumer) Close() error`
func main() {
db, err := af.Open("", "root", "taosdata", "", 0)
if err != nil {
panic(err)
}
defer db.Close()
_, err = db.Exec("create database if not exists example_tmq WAL_RETENTION_PERIOD 86400")
if err != nil {
panic(err)
}
_, err = db.Exec("create topic if not exists example_tmq_topic as DATABASE example_tmq")
if err != nil {
panic(err)
}
if err != nil {
panic(err)
}
consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
"group.id": "test",
"auto.offset.reset": "earliest",
"td.connect.ip": "127.0.0.1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"td.connect.port": "6030",
"client.id": "test_tmq_client",
"enable.auto.commit": "false",
"msg.with.table.name": "true",
})
if err != nil {
panic(err)
}
err = consumer.Subscribe("example_tmq_topic", nil)
if err != nil {
panic(err)
}
_, err = db.Exec("create table example_tmq.t1 (ts timestamp,v int)")
if err != nil {
panic(err)
}
_, err = db.Exec("insert into example_tmq.t1 values(now,1)")
if err != nil {
panic(err)
}
for i := 0; i < 5; i++ {
ev := consumer.Poll(500)
if ev != nil {
switch e := ev.(type) {
case *tmqcommon.DataMessage:
fmt.Printf("get message:%v\n", e)
case tmqcommon.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
panic(e)
}
consumer.Commit()
}
}
partitions, err := consumer.Assignment()
if err != nil {
panic(err)
}
for i := 0; i < len(partitions); i++ {
fmt.Println(partitions[i])
err = consumer.Seek(tmqcommon.TopicPartition{
Topic: partitions[i].Topic,
Partition: partitions[i].Partition,
Offset: 0,
}, 0)
if err != nil {
panic(err)
}
}
关闭连接。
partitions, err = consumer.Assignment()
if err != nil {
panic(err)
}
for i := 0; i < len(partitions); i++ {
fmt.Println(partitions[i])
}
完整订阅示例参见 [GitHub 示例文件](https://github.com/taosdata/driver-go/blob/main/examples/tmqoverws/main.go)
err = consumer.Close()
if err != nil {
panic(err)
}
}
```
### 通过 WebSocket 进行参数绑定
</TabItem>
<TabItem value="WebSocket" label="WebSocket 连接">
* `func NewConnector(config *Config) (*Connector, error)`
```go
package main
创建连接。
import (
"database/sql"
"fmt"
* `func (c *Connector) Init() (*Stmt, error)`
"github.com/taosdata/driver-go/v3/common"
tmqcommon "github.com/taosdata/driver-go/v3/common/tmq"
_ "github.com/taosdata/driver-go/v3/taosRestful"
"github.com/taosdata/driver-go/v3/ws/tmq"
)
初始化参数。
func main() {
db, err := sql.Open("taosRestful", "root:taosdata@http(localhost:6041)/")
if err != nil {
panic(err)
}
defer db.Close()
prepareEnv(db)
consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
"ws.url": "ws://127.0.0.1:6041/rest/tmq",
"ws.message.channelLen": uint(0),
"ws.message.timeout": common.DefaultMessageTimeout,
"ws.message.writeWait": common.DefaultWriteWait,
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"group.id": "example",
"client.id": "example_consumer",
"auto.offset.reset": "earliest",
})
if err != nil {
panic(err)
}
err = consumer.Subscribe("example_ws_tmq_topic", nil)
if err != nil {
panic(err)
}
go func() {
_, err := db.Exec("create table example_ws_tmq.t_all(ts timestamp," +
"c1 bool," +
"c2 tinyint," +
"c3 smallint," +
"c4 int," +
"c5 bigint," +
"c6 tinyint unsigned," +
"c7 smallint unsigned," +
"c8 int unsigned," +
"c9 bigint unsigned," +
"c10 float," +
"c11 double," +
"c12 binary(20)," +
"c13 nchar(20)" +
")")
if err != nil {
panic(err)
}
_, err = db.Exec("insert into example_ws_tmq.t_all values(now,true,2,3,4,5,6,7,8,9,10.123,11.123,'binary','nchar')")
if err != nil {
panic(err)
}
}()
for i := 0; i < 5; i++ {
ev := consumer.Poll(500)
if ev != nil {
switch e := ev.(type) {
case *tmqcommon.DataMessage:
fmt.Printf("get message:%v\n", e)
case tmqcommon.Error:
fmt.Printf("%% Error: %v: %v\n", e.Code(), e)
panic(e)
}
consumer.Commit()
}
}
partitions, err := consumer.Assignment()
if err != nil {
panic(err)
}
for i := 0; i < len(partitions); i++ {
fmt.Println(partitions[i])
err = consumer.Seek(tmqcommon.TopicPartition{
Topic: partitions[i].Topic,
Partition: partitions[i].Partition,
Offset: 0,
}, 0)
if err != nil {
panic(err)
}
}
* `func (c *Connector) Close() error`
partitions, err = consumer.Assignment()
if err != nil {
panic(err)
}
for i := 0; i < len(partitions); i++ {
fmt.Println(partitions[i])
}
关闭连接。
err = consumer.Close()
if err != nil {
panic(err)
}
}
* `func (s *Stmt) Prepare(sql string) error`
func prepareEnv(db *sql.DB) {
_, err := db.Exec("create database example_ws_tmq WAL_RETENTION_PERIOD 86400")
if err != nil {
panic(err)
}
_, err = db.Exec("create topic example_ws_tmq_topic as database example_ws_tmq")
if err != nil {
panic(err)
}
}
```
参数绑定预处理 SQL 语句。
</TabItem>
</Tabs>
* `func (s *Stmt) SetTableName(name string) error`
### 更多示例程序
参数绑定设置表名。
* [示例程序](https://github.com/taosdata/driver-go/tree/3.0/examples)
* [视频教程](https://www.taosdata.com/blog/2020/11/11/1951.html)
* `func (s *Stmt) SetTags(tags *param.Param, bindType *param.ColumnType) error`
## 常见问题
参数绑定设置标签。
1. database/sql stmt(参数绑定)相关接口崩溃
* `func (s *Stmt) BindParam(params []*param.Param, bindType *param.ColumnType) error`
REST 不支持参数绑定相关接口,建议使用`db.Exec``db.Query`
参数绑定多行数据。
2. 使用 `use db` 语句后执行其他语句报错 `[0x217] Database not specified or available`
* `func (s *Stmt) AddBatch() error`
REST 接口中 SQL 语句的执行无上下文关联,使用 `use db` 语句不会生效,解决办法见上方使用限制章节。
添加到参数绑定批处理。
3. 使用 taosSql 不报错使用 taosRestful 报错 `[0x217] Database not specified or available`
* `func (s *Stmt) Exec() error`
因为 REST 接口无状态,使用 `use db` 语句不会生效,解决办法见上方使用限制章节。
执行参数绑定。
4. `readBufferSize` 参数调大后无明显效果
* `func (s *Stmt) GetAffectedRows() int`
`readBufferSize` 调大后会减少获取结果时 `syscall` 的调用。如果查询结果的数据量不大,修改该参数不会带来明显提升,如果该参数修改过大,瓶颈会在解析 JSON 数据。如果需要优化查询速度,需要根据实际情况调整该值来达到查询效果最优。
获取参数绑定插入受影响行数。
5. `disableCompression` 参数设置为 `false` 时查询效率降低
* `func (s *Stmt) Close() error`
`disableCompression` 参数设置为 `false` 时查询结果会使用 `gzip` 压缩后传输,拿到数据后要先进行 `gzip` 解压。
结束参数绑定。
6. `go get` 命令无法获取包,或者获取包超时
完整参数绑定示例参见 [GitHub 示例文件](https://github.com/taosdata/driver-go/blob/main/examples/stmtoverws/main.go)
设置 Go 代理 `go env -w GOPROXY=https://goproxy.cn,direct`
## API 参考
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册