case002.go 1.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
package main

import (
	"database/sql/driver"
	"fmt"
	"io"
	"os"
	"time"

	taos "github.com/taosdata/driver-go/v2/af"
)

func Subscribe_check(topic taos.Subscriber, check int) bool {
	count := 0
	rows, err := topic.Consume()
	defer func() { rows.Close(); time.Sleep(time.Second) }()
	if err != nil {
		fmt.Println(err)
		os.Exit(3)
	}
	for {
		values := make([]driver.Value, 2)
		err := rows.Next(values)
		if err == io.EOF {
			break
		} else if err != nil {
			fmt.Fprintln(os.Stderr, err)
			os.Exit(4)
		}
		count++
	}
	if count == check {
		return false
	} else {
		return true
	}
}
func main() {
	ts := 1630461600000
	db, err := taos.Open("127.0.0.1", "", "", "", 0)
	if err != nil {
		fmt.Fprintln(os.Stderr, err)
		os.Exit(1)
	}
	defer db.Close()
46 47
	db.Exec("drop database if exists test")
	db.Exec("create database if not exists test ")
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
	db.Exec("use test")
	db.Exec("create table test (ts timestamp ,level int)")
	for i := 0; i < 10; i++ {
		sqlcmd := fmt.Sprintf("insert into test values(%d,%d)", ts+i, i)
		db.Exec(sqlcmd)
	}

	fmt.Println("consumption 01.")
	topic, err := db.Subscribe(false, "test", "select ts, level from test", time.Second)
	if Subscribe_check(topic, 10) {
		os.Exit(3)
	}

	fmt.Println("consumption 02: no new rows inserted")
	if Subscribe_check(topic, 0) {
		os.Exit(3)
	}

	fmt.Println("consumption 03: after one new rows inserted")
	sqlcmd := fmt.Sprintf("insert into test values(%d,%d)", ts+10, 10)
	db.Exec(sqlcmd)
	if Subscribe_check(topic, 1) {
		os.Exit(3)
	}

	fmt.Println("consumption 04: keep progress and continue previous subscription")
	topic.Unsubscribe(true)
	topic, err = db.Subscribe(false, "test", "select ts, level from test", time.Second)
	if Subscribe_check(topic, 0) {
		os.Exit(3)
	}

}