test_subscribe.py 2.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
from taos.subscription import TaosSubscription
from taos import *
from ctypes import *
import taos
import pytest
import time
from random import random


@pytest.fixture
def conn():
    return taos.connect()


def test_subscribe(conn):
    # type: (TaosConnection) -> None

    dbname = "pytest_taos_subscribe_callback"
    try:
        conn.execute("drop database if exists %s" % dbname)
        conn.execute("create database if not exists %s" % dbname)
        conn.select_db(dbname)
        conn.execute("create table if not exists log(ts timestamp, n int)")
        for i in range(10):
            conn.execute("insert into log values(now, %d)" % i)

        sub = conn.subscribe(True, "test", "select * from log", 1000)
        print("# consume from begin")
        for ts, n in sub.consume():
            print(ts, n)
        
        print("# consume new data")
        for i in range(5):
            conn.execute("insert into log values(now, %d)(now+1s, %d)" % (i, i))
            result = sub.consume()
            for ts, n in result:
                print(ts, n)
        
        print("# consume with a stop condition")
        for i in range(10):
            conn.execute("insert into log values(now, %d)" % int(random() * 10))
            result = sub.consume()
            try:
                ts, n = next(result)
                print(ts, n)
                if n > 5:
                    result.stop_query()
                    print("## stopped")
                    break
            except StopIteration:
                continue

        sub.close()

        conn.execute("drop database if exists %s" % dbname)
        conn.close()
    except Exception as err:
        conn.execute("drop database if exists %s" % dbname)
        conn.close()
        raise err


def subscribe_callback(p_sub, p_result, p_param, errno):
    # type: (c_void_p, c_void_p, c_void_p, c_int) -> None
    print("callback")
    result = TaosResult(p_result)
    result.check_error(errno)
    for row in result.rows_iter():
        ts, n = row()
        print(ts, n)


def test_subscribe_callback(conn):
    # type: (TaosConnection) -> None
    dbname = "pytest_taos_subscribe_callback"
    try:
        conn.execute("drop database if exists %s" % dbname)
        conn.execute("create database if not exists %s" % dbname)
        conn.select_db(dbname)
        conn.execute("create table if not exists log(ts timestamp, n int)")

        print("# subscribe with callback")
        sub = conn.subscribe(False, "test", "select * from log", 1000, subscribe_callback)

        for i in range(10):
            conn.execute("insert into log values(now, %d)" % i)
            time.sleep(0.7)
        sub.close()

        conn.execute("drop database if exists %s" % dbname)
        conn.close()
    except Exception as err:
        conn.execute("drop database if exists %s" % dbname)
        conn.close()
        raise err


if __name__ == "__main__":
    test_subscribe(taos.connect())
    test_subscribe_callback(taos.connect())