subscribe-async.py 1.4 KB
Newer Older
1 2 3 4 5 6 7 8 9
from taos import *
from ctypes import *

import time


def subscribe_callback(p_sub, p_result, p_param, errno):
    # type: (c_void_p, c_void_p, c_void_p, c_int) -> None
    print("# fetch in callback")
10
    result = TaosResult(c_void_p(p_result))
11 12 13 14 15 16 17 18 19 20
    result.check_error(errno)
    for row in result.rows_iter():
        ts, n = row()
        print(ts, n)


def test_subscribe_callback(conn):
    # type: (TaosConnection) -> None
    dbname = "pytest_taos_subscribe_callback"
    try:
21
        print("drop if exists")
22
        conn.execute("drop database if exists %s" % dbname)
23
        print("create database")
24
        conn.execute("create database if not exists %s" % dbname)
25 26 27
        print("create table")
        # conn.execute("use %s" % dbname)
        conn.execute("create table if not exists %s.log(ts timestamp, n int)" % dbname)
28 29

        print("# subscribe with callback")
30
        sub = conn.subscribe(False, "test", "select * from %s.log" % dbname, 1000, subscribe_callback)
31 32

        for i in range(10):
33
            conn.execute("insert into %s.log values(now, %d)" % (dbname, i))
34
            time.sleep(0.7)
35
        sub.close()
36 37 38 39 40 41 42 43 44 45 46

        conn.execute("drop database if exists %s" % dbname)
        # conn.close()
    except Exception as err:
        conn.execute("drop database if exists %s" % dbname)
        # conn.close()
        raise err


if __name__ == "__main__":
    test_subscribe_callback(connect())