test_ctaos.py 5.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
from taos.cinterface import *
from taos.precision import *
from taos.bind import *

import time
import datetime
import pytest

@pytest.fixture
def conn():
    return CTaosInterface().connect()


def test_simple(conn, caplog):
    dbname = "pytest_ctaos_simple"
    try:
        res = taos_query(conn, "create database if not exists %s" % dbname)
        taos_free_result(res)

        taos_select_db(conn, dbname)

        res = taos_query(
            conn,
            "create table if not exists log(ts timestamp, level tinyint, content binary(100), ipaddr binary(134))",
        )
        taos_free_result(res)

        res = taos_query(conn, "insert into log values(now, 1, 'hello', 'test')")
        taos_free_result(res)

        res = taos_query(conn, "select level,content,ipaddr from log limit 1")

        fields = taos_fetch_fields_raw(res)
        field_count = taos_field_count(res)

        fields = taos_fetch_fields(res)
        for field in fields:
            print(field)

        # field_lengths = taos_fetch_lengths(res, field_count)
        # if not field_lengths:
        #     raise "fetch lengths error"

        row = taos_fetch_row_raw(res)
        rowstr = taos_print_row(row, fields, field_count)
        assert rowstr == "1 hello test"

        row, num = taos_fetch_row(res, fields)
        print(row)
        taos_free_result(res)
        taos_query(conn, "drop database if exists " + dbname)
        taos_close(conn)
    except Exception as err:
        taos_query(conn, "drop database if exists " + dbname)
        raise err


def test_stmt(conn, caplog):
    dbname = "pytest_ctaos_stmt"
    try:
        res = taos_query(conn, "drop database if exists %s" % dbname)
        taos_free_result(res)
        res = taos_query(conn, "create database if not exists %s" % dbname)
        taos_free_result(res)

        taos_select_db(conn, dbname)

        res = taos_query(
            conn,
            "create table if not exists log(ts timestamp, nil tinyint, ti tinyint, si smallint, ii int,\
             bi bigint, tu tinyint unsigned, su smallint unsigned, iu int unsigned, bu bigint unsigned, \
             ff float, dd double, bb binary(100), nn nchar(100))",
        )
        taos_free_result(res)

        stmt = taos_stmt_init(conn)

        taos_stmt_prepare(stmt, "insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)")

        params = new_bind_params(14)
        params[0].timestamp(1626861392589, PrecisionEnum.Milliseconds)
        params[1].null()
        params[2].tinyint(2)
        params[3].smallint(3)
        params[4].int(4)
        params[5].bigint(5)
        params[6].tinyint_unsigned(6)
        params[7].smallint_unsigned(7)
        params[8].int_unsigned(8)
        params[9].bigint_unsigned(9)
        params[10].float(10.1)
        params[11].double(10.11)
        params[12].binary("hello")
        params[13].nchar("stmt")
        taos_stmt_bind_param(stmt, params)
        taos_stmt_add_batch(stmt)
        taos_stmt_execute(stmt)

        res = taos_query(conn, "select * from log limit 1")

        fields = taos_fetch_fields(res)
        filed_count = taos_field_count(res)

        row = taos_fetch_row_raw(res)
        rowstr = taos_print_row(row, fields, filed_count, 100)

        taos_free_result(res)
        taos_query(conn, "drop database if exists " + dbname)
        taos_close(conn)

        assert rowstr == "1626861392589 NULL 2 3 4 5 6 7 8 9 10.100000 10.110000 hello stmt"
    except Exception as err:
        taos_query(conn, "drop database if exists " + dbname)
        raise err

def stream_callback(param, result, row):
    # type: (c_void_p, c_void_p, c_void_p) -> None
    try:
        if result == None or row == None:
            return
        result = c_void_p(result)
        row = c_void_p(row)
        fields = taos_fetch_fields_raw(result)
        num_fields = taos_field_count(result)
        s = taos_print_row(row, fields, num_fields)
        print(s)
        taos_stop_query(result)
    except Exception as err:
        print(err)

def test_stream(conn, caplog):
    dbname = "pytest_ctaos_stream"
    try:
        res = taos_query(conn, "create database if not exists %s" % dbname)
        taos_free_result(res)

        taos_select_db(conn, dbname)

        res = taos_query(
            conn,
            "create table if not exists log(ts timestamp, n int)",
        )
        taos_free_result(res)

        res = taos_query(conn, "select count(*) from log interval(5s)")
        cc = taos_num_fields(res)
        assert cc == 2

        stream = taos_open_stream(conn, "select count(*) from log interval(5s)", stream_callback, 0, None, None)
        print("waiting for data")
        time.sleep(1)

        for i in range(0, 2):
            res = taos_query(conn, "insert into log values(now,0)(now+1s, 1)(now + 2s, 2)")
            taos_free_result(res)
            time.sleep(2)
        taos_close_stream(stream)
        taos_query(conn, "drop database if exists " + dbname)
        taos_close(conn)
    except Exception as err:
        taos_query(conn, "drop database if exists " + dbname)
        raise err