case002.go 1.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
package main

import (
	"database/sql/driver"
	"fmt"
	"io"
	"os"
	"time"

	taos "github.com/taosdata/driver-go/v2/af"
)

func Subscribe_check(topic taos.Subscriber, check int) bool {
	count := 0
	rows, err := topic.Consume()
	defer func() { rows.Close(); time.Sleep(time.Second) }()
	if err != nil {
		fmt.Println(err)
		os.Exit(3)
	}
	for {
		values := make([]driver.Value, 2)
		err := rows.Next(values)
		if err == io.EOF {
			break
		} else if err != nil {
			fmt.Fprintln(os.Stderr, err)
			os.Exit(4)
		}
		count++
	}
	if count == check {
		return false
	} else {
		return true
	}
}
func main() {
	ts := 1630461600000
	db, err := taos.Open("127.0.0.1", "", "", "", 0)
	if err != nil {
		fmt.Fprintln(os.Stderr, err)
		os.Exit(1)
	}
	defer db.Close()
	db.Exec("drop database if exists test")
	db.Exec("create database if not exists test ")
	db.Exec("use test")
	db.Exec("create table test (ts timestamp ,level int)")
	for i := 0; i < 10; i++ {
		sqlcmd := fmt.Sprintf("insert into test values(%d,%d)", ts+i, i)
		db.Exec(sqlcmd)
	}

	fmt.Println("consumption 01.")
	topic, err := db.Subscribe(false, "test", "select ts, level from test", time.Second)
	if Subscribe_check(topic, 10) {
		os.Exit(3)
	}

	fmt.Println("consumption 02: no new rows inserted")
	if Subscribe_check(topic, 0) {
		os.Exit(3)
	}

	fmt.Println("consumption 03: after one new rows inserted")
	sqlcmd := fmt.Sprintf("insert into test values(%d,%d)", ts+10, 10)
	db.Exec(sqlcmd)
	if Subscribe_check(topic, 1) {
		os.Exit(3)
	}

	fmt.Println("consumption 04: keep progress and continue previous subscription")
	topic.Unsubscribe(true)
	topic, err = db.Subscribe(false, "test", "select ts, level from test", time.Second)
	if Subscribe_check(topic, 0) {
		os.Exit(3)
	}

}