未验证 提交 5823bcfc 编写于 作者: L Linhe Huo 提交者: GitHub

[TD-3048]<feature>: support lines/stream/query_a/stop_query/ and so on. (#7079)

* [TD-3048]<feature>: support lines/stream/query_a/stop_query/ and so on.

commit 0edc106a76a95b28e65019c2ee4e4ed84530ad35
Author: Huo Linhe <linhehuo@gmail.com>
Date:   Thu Jul 29 21:13:13 2021 +0800

    doc: improve document for python connector

commit 84915de0831b49c8378a16242dd0dbba8aaf386f
Author: Huo Linhe <linhehuo@gmail.com>
Date:   Thu Jul 29 20:35:45 2021 +0800

    chore: add time elapsed

commit 1e8822e01bda96388a359363776792e261260b88
Author: Huo Linhe <linhehuo@gmail.com>
Date:   Thu Jul 29 20:26:01 2021 +0800

    feat: support multi bind

commit 82d823f6194471fd3324b50f7dfba0ee9f10a7dd
Author: Huo Linhe <linhehuo@gmail.com>
Date:   Thu Jul 29 16:42:05 2021 +0800

    feat: python support for async query and subscribe with callback

commit 8b9d36de2945906821225189cb47958e153d81e2
Author: Huo Linhe <linhehuo@gmail.com>
Date:   Wed Jul 28 18:09:09 2021 +0800

    feat: finish stream and stmt interface

commit bc871aa43e9fc28dd1b3f8784a8ac6ee988564b5
Author: Huo Linhe <linhehuo@gmail.com>
Date:   Mon Jul 26 20:21:27 2021 +0800

    feat: basic full-fetured implementations

commit e5f7a5213e9016c377a57485e8d5c12139ce0957
Author: Huo Linhe <linhehuo@gmail.com>
Date:   Fri Jul 23 10:33:14 2021 +0800

    tmp: refactor

* chore: fix insert/line_insert.py error

* [TD-3048]<fix>: fix tests error

* [TD-3049]<feature>: support stop_query in python connector cursor

* [TD-3048]<doc>: improve python connector document

* [TD-3048]<doc>: improve python connection README

* [TD-3048]<hotfix>: fix python2 install and runtime error

* chore: replace insertLines with insert_lines

* chore: fix misspellings

* chore: fix crash gen error in threading

* feat: support __del__ method for simplify

* test: fix subscription test result check

* chore: compatible with 2.0.20

* chore: fix python connector subscription test case

* [TD-3048]<fix>: fix schemaless insert test

* [TD-3048]<fix>: fix memory leak in crash_gen test case

* [TD-3048]<fix>: minor fix for crash gen memory leak

* [TD-3048]<fix>: set minimal required python3 as 3.4

* chore: update version in setup.py

* [TD-3048]<fix>: fix runtime errors in python3.4

* [TD-3048]<fix>: add typing as a dependency
上级 ba4f1f9e
# TDengine Connector for Python
[TDengine] connector for Python enables python programs to access TDengine, using an API which is compliant with the Python DB API 2.0 (PEP-249). It uses TDengine C client library for client server communications.
[TDengine](https://github.com/taosdata/TDengine) connector for Python enables python programs to access TDengine,
using an API which is compliant with the Python DB API 2.0 (PEP-249). It uses TDengine C client library for client server communications.
## Install
......@@ -11,8 +12,417 @@ pip install ./TDengine/src/connector/python
## Source Code
[TDengine] connector for Python source code is hosted on [GitHub](https://github.com/taosdata/TDengine/tree/develop/src/connector/python).
[TDengine](https://github.com/taosdata/TDengine) connector for Python source code is hosted on [GitHub](https://github.com/taosdata/TDengine/tree/develop/src/connector/python).
## License - AGPL
## Examples
### Query with PEP-249 API
```python
import taos
conn = taos.connect()
cursor = conn.cursor()
cursor.execute("show databases")
results = cursor.fetchall()
for row in results:
print(row)
cursor.close()
conn.close()
```
### Query with objective API
```python
import taos
conn = taos.connect()
conn.exec("create database if not exists pytest")
result = conn.query("show databases")
num_of_fields = result.field_count
for field in result.fields:
print(field)
for row in result:
print(row)
result.close()
conn.exec("drop database pytest")
conn.close()
```
### Query with async API
```python
from taos import *
from ctypes import *
import time
def fetch_callback(p_param, p_result, num_of_rows):
print("fetched ", num_of_rows, "rows")
p = cast(p_param, POINTER(Counter))
result = TaosResult(p_result)
if num_of_rows == 0:
print("fetching completed")
p.contents.done = True
result.close()
return
if num_of_rows < 0:
p.contents.done = True
result.check_error(num_of_rows)
result.close()
return None
for row in result.rows_iter(num_of_rows):
# print(row)
None
p.contents.count += result.row_count
result.fetch_rows_a(fetch_callback, p_param)
def query_callback(p_param, p_result, code):
# type: (c_void_p, c_void_p, c_int) -> None
if p_result == None:
return
result = TaosResult(p_result)
if code == 0:
result.fetch_rows_a(fetch_callback, p_param)
result.check_error(code)
class Counter(Structure):
_fields_ = [("count", c_int), ("done", c_bool)]
def __str__(self):
return "{ count: %d, done: %s }" % (self.count, self.done)
def test_query(conn):
# type: (TaosConnection) -> None
counter = Counter(count=0)
conn.query_a("select * from log.log", query_callback, byref(counter))
while not counter.done:
print("wait query callback")
time.sleep(1)
print(counter)
conn.close()
if __name__ == "__main__":
test_query(connect())
```
### Statement API - Bind row after row
```python
from taos import *
conn = connect()
dbname = "pytest_taos_stmt"
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.exec(
"create table if not exists log(ts timestamp, bo bool, nil tinyint, \
ti tinyint, si smallint, ii int, bi bigint, tu tinyint unsigned, \
su smallint unsigned, iu int unsigned, bu bigint unsigned, \
ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)",
)
stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
params = new_bind_params(16)
params[0].timestamp(1626861392589)
params[1].bool(True)
params[2].null()
params[3].tinyint(2)
params[4].smallint(3)
params[5].int(4)
params[6].bigint(5)
params[7].tinyint_unsigned(6)
params[8].smallint_unsigned(7)
params[9].int_unsigned(8)
params[10].bigint_unsigned(9)
params[11].float(10.1)
params[12].double(10.11)
params[13].binary("hello")
params[14].nchar("stmt")
params[15].timestamp(1626861392589)
stmt.bind_param(params)
params[0].timestamp(1626861392590)
params[15].null()
stmt.bind_param(params)
stmt.execute()
result = stmt.use_result()
assert result.affected_rows == 2
result.close()
result = conn.query("select * from log")
for row in result:
print(row)
result.close()
stmt.close()
conn.close()
```
### Statement API - Bind multi rows
```python
from taos import *
conn = connect()
dbname = "pytest_taos_stmt"
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.exec(
"create table if not exists log(ts timestamp, bo bool, nil tinyint, \
ti tinyint, si smallint, ii int, bi bigint, tu tinyint unsigned, \
su smallint unsigned, iu int unsigned, bu bigint unsigned, \
ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)",
)
stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
params = new_multi_binds(16)
params[0].timestamp((1626861392589, 1626861392590, 1626861392591))
params[1].bool((True, None, False))
params[2].tinyint([-128, -128, None]) # -128 is tinyint null
params[3].tinyint([0, 127, None])
params[4].smallint([3, None, 2])
params[5].int([3, 4, None])
params[6].bigint([3, 4, None])
params[7].tinyint_unsigned([3, 4, None])
params[8].smallint_unsigned([3, 4, None])
params[9].int_unsigned([3, 4, None])
params[10].bigint_unsigned([3, 4, None])
params[11].float([3, None, 1])
params[12].double([3, None, 1.2])
params[13].binary(["abc", "dddafadfadfadfadfa", None])
params[14].nchar(["涛思数据", None, "a long string with 中文字符"])
params[15].timestamp([None, None, 1626861392591])
stmt.bind_param_batch(params)
stmt.execute()
result = stmt.use_result()
assert result.affected_rows == 3
result.close()
result = conn.query("select * from log")
for row in result:
print(row)
result.close()
stmt.close()
conn.close()
```
### Statement API - Subscribe
```python
import taos
conn = taos.connect()
dbname = "pytest_taos_subscribe_callback"
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.exec("create table if not exists log(ts timestamp, n int)")
for i in range(10):
conn.exec("insert into log values(now, %d)" % i)
sub = conn.subscribe(True, "test", "select * from log", 1000)
print("# consume from begin")
for ts, n in sub.consume():
print(ts, n)
print("# consume new data")
for i in range(5):
conn.exec("insert into log values(now, %d)(now+1s, %d)" % (i, i))
result = sub.consume()
for ts, n in result:
print(ts, n)
print("# consume with a stop condition")
for i in range(10):
conn.exec("insert into log values(now, %d)" % int(random() * 10))
result = sub.consume()
try:
ts, n = next(result)
print(ts, n)
if n > 5:
result.stop_query()
print("## stopped")
break
except StopIteration:
continue
sub.close()
conn.exec("drop database if exists %s" % dbname)
conn.close()
```
### Statement API - Subscribe asynchronously with callback
```python
from taos import *
from ctypes import *
import time
def subscribe_callback(p_sub, p_result, p_param, errno):
# type: (c_void_p, c_void_p, c_void_p, c_int) -> None
print("# fetch in callback")
result = TaosResult(p_result)
result.check_error(errno)
for row in result.rows_iter():
ts, n = row()
print(ts, n)
def test_subscribe_callback(conn):
# type: (TaosConnection) -> None
dbname = "pytest_taos_subscribe_callback"
try:
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.exec("create table if not exists log(ts timestamp, n int)")
print("# subscribe with callback")
sub = conn.subscribe(False, "test", "select * from log", 1000, subscribe_callback)
for i in range(10):
conn.exec("insert into log values(now, %d)" % i)
time.sleep(0.7)
sub.close()
conn.exec("drop database if exists %s" % dbname)
conn.close()
except Exception as err:
conn.exec("drop database if exists %s" % dbname)
conn.close()
raise err
if __name__ == "__main__":
test_subscribe_callback(connect())
```
### Statement API - Stream
```python
from taos import *
from ctypes import *
def stream_callback(p_param, p_result, p_row):
# type: (c_void_p, c_void_p, c_void_p) -> None
if p_result == None or p_row == None:
return
result = TaosResult(p_result)
row = TaosRow(result, p_row)
try:
ts, count = row()
p = cast(p_param, POINTER(Counter))
p.contents.count += count
print("[%s] inserted %d in 5s, total count: %d" % (ts.strftime("%Y-%m-%d %H:%M:%S"), count, p.contents.count))
except Exception as err:
print(err)
raise err
class Counter(ctypes.Structure):
_fields_ = [
("count", c_int),
]
def __str__(self):
return "%d" % self.count
def test_stream(conn):
# type: (TaosConnection) -> None
dbname = "pytest_taos_stream"
try:
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.exec("create table if not exists log(ts timestamp, n int)")
result = conn.query("select count(*) from log interval(5s)")
assert result.field_count == 2
counter = Counter()
counter.count = 0
stream = conn.stream("select count(*) from log interval(5s)", stream_callback, param=byref(counter))
for _ in range(0, 20):
conn.exec("insert into log values(now,0)(now+1s, 1)(now + 2s, 2)")
time.sleep(2)
stream.close()
conn.exec("drop database if exists %s" % dbname)
conn.close()
except Exception as err:
conn.exec("drop database if exists %s" % dbname)
conn.close()
raise err
if __name__ == "__main__":
test_stream(connect())
```
### Insert with line protocol
```python
import taos
conn = taos.connect()
dbname = "pytest_line"
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s precision 'us'" % dbname)
conn.select_db(dbname)
lines = [
'st,t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"pass",c2=false,c4=4f64 1626006833639000000ns',
'st,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"pass it again",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000ns',
'stf,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"pass it again_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000ns',
]
conn.insert_lines(lines)
print("inserted")
lines = [
'stf,t1=5i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"pass it again_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000ns',
]
conn.insert_lines(lines)
result = conn.query("show tables")
for row in result:
print(row)
result.close()
conn.exec("drop database if exists %s" % dbname)
conn.close()
```
## License - AGPL-3.0
Keep same with [TDengine](https://github.com/taosdata/TDengine).
# encoding:UTF-8
from taos import *
conn = connect()
dbname = "pytest_taos_stmt_multi"
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.execute(
"create table if not exists log(ts timestamp, bo bool, nil tinyint, \
ti tinyint, si smallint, ii int, bi bigint, tu tinyint unsigned, \
su smallint unsigned, iu int unsigned, bu bigint unsigned, \
ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)",
)
stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
params = new_multi_binds(16)
params[0].timestamp((1626861392589, 1626861392590, 1626861392591))
params[1].bool((True, None, False))
params[2].tinyint([-128, -128, None]) # -128 is tinyint null
params[3].tinyint([0, 127, None])
params[4].smallint([3, None, 2])
params[5].int([3, 4, None])
params[6].bigint([3, 4, None])
params[7].tinyint_unsigned([3, 4, None])
params[8].smallint_unsigned([3, 4, None])
params[9].int_unsigned([3, 4, None])
params[10].bigint_unsigned([3, 4, None])
params[11].float([3, None, 1])
params[12].double([3, None, 1.2])
params[13].binary(["abc", "dddafadfadfadfadfa", None])
params[14].nchar(["涛思数据", None, "a long string with 中文字符"])
params[15].timestamp([None, None, 1626861392591])
stmt.bind_param_batch(params)
stmt.execute()
result = stmt.use_result()
assert result.affected_rows == 3
result.close()
result = conn.query("select * from log")
for row in result:
print(row)
result.close()
stmt.close()
conn.close()
\ No newline at end of file
from taos import *
conn = connect()
dbname = "pytest_taos_stmt"
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.execute(
"create table if not exists log(ts timestamp, bo bool, nil tinyint, \
ti tinyint, si smallint, ii int, bi bigint, tu tinyint unsigned, \
su smallint unsigned, iu int unsigned, bu bigint unsigned, \
ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)",
)
stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
params = new_bind_params(16)
params[0].timestamp(1626861392589)
params[1].bool(True)
params[2].null()
params[3].tinyint(2)
params[4].smallint(3)
params[5].int(4)
params[6].bigint(5)
params[7].tinyint_unsigned(6)
params[8].smallint_unsigned(7)
params[9].int_unsigned(8)
params[10].bigint_unsigned(9)
params[11].float(10.1)
params[12].double(10.11)
params[13].binary("hello")
params[14].nchar("stmt")
params[15].timestamp(1626861392589)
stmt.bind_param(params)
params[0].timestamp(1626861392590)
params[15].null()
stmt.bind_param(params)
stmt.execute()
result = stmt.use_result()
assert result.affected_rows == 2
# No need to explicitly close, but ok for you
# result.close()
result = conn.query("select * from log")
for row in result:
print(row)
# No need to explicitly close, but ok for you
# result.close()
# stmt.close()
# conn.close()
import taos
conn = taos.connect()
dbname = "pytest_line"
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s precision 'us'" % dbname)
conn.select_db(dbname)
lines = [
'st,t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"pass",c2=false,c4=4f64 1626006833639000000ns',
]
conn.insert_lines(lines)
print("inserted")
conn.insert_lines(lines)
result = conn.query("show tables")
for row in result:
print(row)
conn.execute("drop database if exists %s" % dbname)
import taos
conn = taos.connect()
cursor = conn.cursor()
cursor.execute("show databases")
results = cursor.fetchall()
for row in results:
print(row)
from taos import *
from ctypes import *
import time
def fetch_callback(p_param, p_result, num_of_rows):
print("fetched ", num_of_rows, "rows")
p = cast(p_param, POINTER(Counter))
result = TaosResult(p_result)
if num_of_rows == 0:
print("fetching completed")
p.contents.done = True
# should explicitly close the result in fetch completed or cause error
result.close()
return
if num_of_rows < 0:
p.contents.done = True
result.check_error(num_of_rows)
result.close()
return None
for row in result.rows_iter(num_of_rows):
# print(row)
None
p.contents.count += result.row_count
result.fetch_rows_a(fetch_callback, p_param)
def query_callback(p_param, p_result, code):
# type: (c_void_p, c_void_p, c_int) -> None
if p_result == None:
return
result = TaosResult(p_result)
if code == 0:
result.fetch_rows_a(fetch_callback, p_param)
result.check_error(code)
# explicitly close result while query failed
result.close()
class Counter(Structure):
_fields_ = [("count", c_int), ("done", c_bool)]
def __str__(self):
return "{ count: %d, done: %s }" % (self.count, self.done)
def test_query(conn):
# type: (TaosConnection) -> None
counter = Counter(count=0)
conn.query_a("select * from log.log", query_callback, byref(counter))
while not counter.done:
print("wait query callback")
time.sleep(1)
print(counter)
# conn.close()
if __name__ == "__main__":
test_query(connect())
\ No newline at end of file
import taos
conn = taos.connect()
conn.execute("create database if not exists pytest")
result = conn.query("show databases")
num_of_fields = result.field_count
for field in result.fields:
print(field)
for row in result:
print(row)
conn.execute("drop database pytest")
from taos import *
from ctypes import *
import time
def subscribe_callback(p_sub, p_result, p_param, errno):
# type: (c_void_p, c_void_p, c_void_p, c_int) -> None
print("# fetch in callback")
result = TaosResult(p_result)
result.check_error(errno)
for row in result.rows_iter():
ts, n = row()
print(ts, n)
def test_subscribe_callback(conn):
# type: (TaosConnection) -> None
dbname = "pytest_taos_subscribe_callback"
try:
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.execute("create table if not exists log(ts timestamp, n int)")
print("# subscribe with callback")
sub = conn.subscribe(False, "test", "select * from log", 1000, subscribe_callback)
for i in range(10):
conn.execute("insert into log values(now, %d)" % i)
time.sleep(0.7)
# sub.close()
conn.execute("drop database if exists %s" % dbname)
# conn.close()
except Exception as err:
conn.execute("drop database if exists %s" % dbname)
# conn.close()
raise err
if __name__ == "__main__":
test_subscribe_callback(connect())
import taos
import random
conn = taos.connect()
dbname = "pytest_taos_subscribe"
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.execute("create table if not exists log(ts timestamp, n int)")
for i in range(10):
conn.execute("insert into log values(now, %d)" % i)
sub = conn.subscribe(False, "test", "select * from log", 1000)
print("# consume from begin")
for ts, n in sub.consume():
print(ts, n)
print("# consume new data")
for i in range(5):
conn.execute("insert into log values(now, %d)(now+1s, %d)" % (i, i))
result = sub.consume()
for ts, n in result:
print(ts, n)
sub.close(True)
print("# keep progress consume")
sub = conn.subscribe(False, "test", "select * from log", 1000)
result = sub.consume()
rows = result.fetch_all()
# consume from latest subscription needs root privilege(for /var/lib/taos).
assert result.row_count == 0
print("## consumed ", len(rows), "rows")
print("# consume with a stop condition")
for i in range(10):
conn.execute("insert into log values(now, %d)" % random.randint(0, 10))
result = sub.consume()
try:
ts, n = next(result)
print(ts, n)
if n > 5:
result.stop_query()
print("## stopped")
break
except StopIteration:
continue
sub.close()
# sub.close()
conn.execute("drop database if exists %s" % dbname)
# conn.close()
[tool.poetry]
name = "taos"
version = "2.1.0"
description = "TDengine connector for python"
authors = ["Taosdata Inc. <support@taosdata.com>"]
license = "AGPL-3.0"
readme = "README.md"
[tool.poetry.dependencies]
python = "^2.7 || ^3.4"
typing = "*"
[tool.poetry.dev-dependencies]
pytest = [
{ version = "^4.6", python = "^2.7" },
{ version = "^6.2", python = "^3.7" }
]
pdoc = { version = "^7.1.1", python = "^3.7" }
mypy = { version = "^0.910", python = "^3.6" }
black = { version = "^21.7b0", python = "^3.6" }
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
[tool.black]
line-length = 119
......@@ -5,7 +5,7 @@ with open("README.md", "r") as fh:
setuptools.setup(
name="taos",
version="2.0.11",
version="2.1.0",
author="Taosdata Inc.",
author_email="support@taosdata.com",
description="TDengine python client package",
......
# encoding:UTF-8
"""
# TDengine Connector for Python
from .connection import TDengineConnection
from .cursor import TDengineCursor
[TDengine](https://github.com/taosdata/TDengine) connector for Python enables python programs to access TDengine,
using an API which is compliant with the Python DB API 2.0 (PEP-249). It uses TDengine C client library for client server communications.
# For some reason, the following is needed for VS Code (through PyLance) to
## Install
```sh
git clone --depth 1 https://github.com/taosdata/TDengine.git
pip install ./TDengine/src/connector/python
```
## Source Code
[TDengine](https://github.com/taosdata/TDengine) connector for Python source code is hosted on [GitHub](https://github.com/taosdata/TDengine/tree/develop/src/connector/python).
## Examples
### Query with PEP-249 API
```python
import taos
conn = taos.connect()
cursor = conn.cursor()
cursor.execute("show databases")
results = cursor.fetchall()
for row in results:
print(row)
cursor.close()
conn.close()
```
### Query with objective API
```python
import taos
conn = taos.connect()
conn.exec("create database if not exists pytest")
result = conn.query("show databases")
num_of_fields = result.field_count
for field in result.fields:
print(field)
for row in result:
print(row)
result.close()
conn.exec("drop database pytest")
conn.close()
```
### Query with async API
```python
from taos import *
from ctypes import *
import time
def fetch_callback(p_param, p_result, num_of_rows):
print("fetched ", num_of_rows, "rows")
p = cast(p_param, POINTER(Counter))
result = TaosResult(p_result)
if num_of_rows == 0:
print("fetching completed")
p.contents.done = True
result.close()
return
if num_of_rows < 0:
p.contents.done = True
result.check_error(num_of_rows)
result.close()
return None
for row in result.rows_iter(num_of_rows):
# print(row)
None
p.contents.count += result.row_count
result.fetch_rows_a(fetch_callback, p_param)
def query_callback(p_param, p_result, code):
# type: (c_void_p, c_void_p, c_int) -> None
if p_result == None:
return
result = TaosResult(p_result)
if code == 0:
result.fetch_rows_a(fetch_callback, p_param)
result.check_error(code)
class Counter(Structure):
_fields_ = [("count", c_int), ("done", c_bool)]
def __str__(self):
return "{ count: %d, done: %s }" % (self.count, self.done)
def test_query(conn):
# type: (TaosConnection) -> None
counter = Counter(count=0)
conn.query_a("select * from log.log", query_callback, byref(counter))
while not counter.done:
print("wait query callback")
time.sleep(1)
print(counter)
conn.close()
if __name__ == "__main__":
test_query(connect())
```
### Statement API - Bind row after row
```python
from taos import *
conn = connect()
dbname = "pytest_taos_stmt"
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.exec(
"create table if not exists log(ts timestamp, bo bool, nil tinyint, \\
ti tinyint, si smallint, ii int, bi bigint, tu tinyint unsigned, \\
su smallint unsigned, iu int unsigned, bu bigint unsigned, \\
ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)",
)
stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
params = new_bind_params(16)
params[0].timestamp(1626861392589)
params[1].bool(True)
params[2].null()
params[3].tinyint(2)
params[4].smallint(3)
params[5].int(4)
params[6].bigint(5)
params[7].tinyint_unsigned(6)
params[8].smallint_unsigned(7)
params[9].int_unsigned(8)
params[10].bigint_unsigned(9)
params[11].float(10.1)
params[12].double(10.11)
params[13].binary("hello")
params[14].nchar("stmt")
params[15].timestamp(1626861392589)
stmt.bind_param(params)
params[0].timestamp(1626861392590)
params[15].null()
stmt.bind_param(params)
stmt.execute()
result = stmt.use_result()
assert result.affected_rows == 2
result.close()
result = conn.query("select * from log")
for row in result:
print(row)
result.close()
stmt.close()
conn.close()
```
### Statement API - Bind multi rows
```python
from taos import *
conn = connect()
dbname = "pytest_taos_stmt"
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.exec(
"create table if not exists log(ts timestamp, bo bool, nil tinyint, \\
ti tinyint, si smallint, ii int, bi bigint, tu tinyint unsigned, \\
su smallint unsigned, iu int unsigned, bu bigint unsigned, \\
ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)",
)
stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
params = new_multi_binds(16)
params[0].timestamp((1626861392589, 1626861392590, 1626861392591))
params[1].bool((True, None, False))
params[2].tinyint([-128, -128, None]) # -128 is tinyint null
params[3].tinyint([0, 127, None])
params[4].smallint([3, None, 2])
params[5].int([3, 4, None])
params[6].bigint([3, 4, None])
params[7].tinyint_unsigned([3, 4, None])
params[8].smallint_unsigned([3, 4, None])
params[9].int_unsigned([3, 4, None])
params[10].bigint_unsigned([3, 4, None])
params[11].float([3, None, 1])
params[12].double([3, None, 1.2])
params[13].binary(["abc", "dddafadfadfadfadfa", None])
params[14].nchar(["涛思数据", None, "a long string with 中文字符"])
params[15].timestamp([None, None, 1626861392591])
stmt.bind_param_batch(params)
stmt.execute()
result = stmt.use_result()
assert result.affected_rows == 3
result.close()
result = conn.query("select * from log")
for row in result:
print(row)
result.close()
stmt.close()
conn.close()
```
### Statement API - Subscribe
```python
import taos
conn = taos.connect()
dbname = "pytest_taos_subscribe_callback"
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.exec("create table if not exists log(ts timestamp, n int)")
for i in range(10):
conn.exec("insert into log values(now, %d)" % i)
sub = conn.subscribe(True, "test", "select * from log", 1000)
print("# consume from begin")
for ts, n in sub.consume():
print(ts, n)
print("# consume new data")
for i in range(5):
conn.exec("insert into log values(now, %d)(now+1s, %d)" % (i, i))
result = sub.consume()
for ts, n in result:
print(ts, n)
print("# consume with a stop condition")
for i in range(10):
conn.exec("insert into log values(now, %d)" % int(random() * 10))
result = sub.consume()
try:
ts, n = next(result)
print(ts, n)
if n > 5:
result.stop_query()
print("## stopped")
break
except StopIteration:
continue
sub.close()
conn.exec("drop database if exists %s" % dbname)
conn.close()
```
### Statement API - Subscribe asynchronously with callback
```python
from taos import *
from ctypes import *
import time
def subscribe_callback(p_sub, p_result, p_param, errno):
# type: (c_void_p, c_void_p, c_void_p, c_int) -> None
print("# fetch in callback")
result = TaosResult(p_result)
result.check_error(errno)
for row in result.rows_iter():
ts, n = row()
print(ts, n)
def test_subscribe_callback(conn):
# type: (TaosConnection) -> None
dbname = "pytest_taos_subscribe_callback"
try:
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.exec("create table if not exists log(ts timestamp, n int)")
print("# subscribe with callback")
sub = conn.subscribe(False, "test", "select * from log", 1000, subscribe_callback)
for i in range(10):
conn.exec("insert into log values(now, %d)" % i)
time.sleep(0.7)
sub.close()
conn.exec("drop database if exists %s" % dbname)
conn.close()
except Exception as err:
conn.exec("drop database if exists %s" % dbname)
conn.close()
raise err
if __name__ == "__main__":
test_subscribe_callback(connect())
```
### Statement API - Stream
```python
from taos import *
from ctypes import *
def stream_callback(p_param, p_result, p_row):
# type: (c_void_p, c_void_p, c_void_p) -> None
if p_result == None or p_row == None:
return
result = TaosResult(p_result)
row = TaosRow(result, p_row)
try:
ts, count = row()
p = cast(p_param, POINTER(Counter))
p.contents.count += count
print("[%s] inserted %d in 5s, total count: %d" % (ts.strftime("%Y-%m-%d %H:%M:%S"), count, p.contents.count))
except Exception as err:
print(err)
raise err
class Counter(ctypes.Structure):
_fields_ = [
("count", c_int),
]
def __str__(self):
return "%d" % self.count
def test_stream(conn):
# type: (TaosConnection) -> None
dbname = "pytest_taos_stream"
try:
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.exec("create table if not exists log(ts timestamp, n int)")
result = conn.query("select count(*) from log interval(5s)")
assert result.field_count == 2
counter = Counter()
counter.count = 0
stream = conn.stream("select count(*) from log interval(5s)", stream_callback, param=byref(counter))
for _ in range(0, 20):
conn.exec("insert into log values(now,0)(now+1s, 1)(now + 2s, 2)")
time.sleep(2)
stream.close()
conn.exec("drop database if exists %s" % dbname)
conn.close()
except Exception as err:
conn.exec("drop database if exists %s" % dbname)
conn.close()
raise err
if __name__ == "__main__":
test_stream(connect())
```
### Insert with line protocol
```python
import taos
conn = taos.connect()
dbname = "pytest_line"
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s precision 'us'" % dbname)
conn.select_db(dbname)
lines = [
'st,t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"passit",c2=false,c4=4f64 1626006833639000000ns',
'st,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000ns',
'stf,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000ns',
]
conn.insert_lines(lines)
print("inserted")
lines = [
'stf,t1=5i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000ns',
]
conn.insert_lines(lines)
result = conn.query("show tables")
for row in result:
print(row)
result.close()
conn.exec("drop database if exists %s" % dbname)
conn.close()
```
## License - AGPL-3.0
Keep same with [TDengine](https://github.com/taosdata/TDengine).
"""
from .connection import TaosConnection
# For some reason, the following is needed for VS Code (through PyLance) to
# recognize that "error" is a valid module of the "taos" package.
from .error import ProgrammingError
from .error import *
from .bind import *
from .field import *
from .cursor import *
from .result import *
from .statement import *
from .subscription import *
try:
import importlib.metadata
__version__ = importlib.metadata.version("taos")
except:
None
# Globals
threadsafety = 0
paramstyle = 'pyformat'
__all__ = ['connection', 'cursor']
paramstyle = "pyformat"
__all__ = [
# functions
"connect",
"new_bind_param",
"new_bind_params",
"new_multi_binds",
"new_multi_bind",
# objects
"TaosBind",
"TaosConnection",
"TaosCursor",
"TaosResult",
"TaosRows",
"TaosRow",
"TaosStmt",
"PrecisionEnum",
]
def connect(*args, **kwargs):
""" Function to return a TDengine connector object
# type: (..., ...) -> TaosConnection
"""Function to return a TDengine connector object
Current supporting keyword parameters:
@dsn: Data source name as string
......@@ -25,4 +483,4 @@ def connect(*args, **kwargs):
@rtype: TDengineConnector
"""
return TDengineConnection(*args, **kwargs)
return TaosConnection(*args, **kwargs)
# encoding:UTF-8
import ctypes
from .constants import FieldType
from .error import *
from .precision import *
from datetime import datetime
from ctypes import *
import sys
_datetime_epoch = datetime.utcfromtimestamp(0)
def _is_not_none(obj):
obj != None
class TaosBind(ctypes.Structure):
_fields_ = [
("buffer_type", c_int),
("buffer", c_void_p),
("buffer_length", c_size_t),
("length", POINTER(c_size_t)),
("is_null", POINTER(c_int)),
("is_unsigned", c_int),
("error", POINTER(c_int)),
("u", c_int64),
("allocated", c_int),
]
def null(self):
self.buffer_type = FieldType.C_NULL
self.is_null = pointer(c_int(1))
def bool(self, value):
self.buffer_type = FieldType.C_BOOL
self.buffer = cast(pointer(c_bool(value)), c_void_p)
self.buffer_length = sizeof(c_bool)
def tinyint(self, value):
self.buffer_type = FieldType.C_TINYINT
self.buffer = cast(pointer(c_int8(value)), c_void_p)
self.buffer_length = sizeof(c_int8)
def smallint(self, value):
self.buffer_type = FieldType.C_SMALLINT
self.buffer = cast(pointer(c_int16(value)), c_void_p)
self.buffer_length = sizeof(c_int16)
def int(self, value):
self.buffer_type = FieldType.C_INT
self.buffer = cast(pointer(c_int32(value)), c_void_p)
self.buffer_length = sizeof(c_int32)
def bigint(self, value):
self.buffer_type = FieldType.C_BIGINT
self.buffer = cast(pointer(c_int64(value)), c_void_p)
self.buffer_length = sizeof(c_int64)
def float(self, value):
self.buffer_type = FieldType.C_FLOAT
self.buffer = cast(pointer(c_float(value)), c_void_p)
self.buffer_length = sizeof(c_float)
def double(self, value):
self.buffer_type = FieldType.C_DOUBLE
self.buffer = cast(pointer(c_double(value)), c_void_p)
self.buffer_length = sizeof(c_double)
def binary(self, value):
buffer = None
length = 0
if isinstance(value, str):
bytes = value.encode("utf-8")
buffer = create_string_buffer(bytes)
length = len(bytes)
else:
buffer = value
length = len(value)
self.buffer_type = FieldType.C_BINARY
self.buffer = cast(buffer, c_void_p)
self.buffer_length = length
self.length = pointer(c_size_t(self.buffer_length))
def timestamp(self, value, precision=PrecisionEnum.Milliseconds):
if type(value) is datetime:
if precision == PrecisionEnum.Milliseconds:
ts = int(round((value - _datetime_epoch).total_seconds() * 1000))
elif precision == PrecisionEnum.Microseconds:
ts = int(round((value - _datetime_epoch).total_seconds() * 10000000))
else:
raise PrecisionError("datetime do not support nanosecond precision")
elif type(value) is float:
if precision == PrecisionEnum.Milliseconds:
ts = int(round(value * 1000))
elif precision == PrecisionEnum.Microseconds:
ts = int(round(value * 10000000))
else:
raise PrecisionError("time float do not support nanosecond precision")
elif isinstance(value, int) and not isinstance(value, bool):
ts = value
elif isinstance(value, str):
value = datetime.fromisoformat(value)
if precision == PrecisionEnum.Milliseconds:
ts = int(round(value * 1000))
elif precision == PrecisionEnum.Microseconds:
ts = int(round(value * 10000000))
else:
raise PrecisionError("datetime do not support nanosecond precision")
self.buffer_type = FieldType.C_TIMESTAMP
self.buffer = cast(pointer(c_int64(ts)), c_void_p)
self.buffer_length = sizeof(c_int64)
def nchar(self, value):
buffer = None
length = 0
if isinstance(value, str):
bytes = value.encode("utf-8")
buffer = create_string_buffer(bytes)
length = len(bytes)
else:
buffer = value
length = len(value)
self.buffer_type = FieldType.C_NCHAR
self.buffer = cast(buffer, c_void_p)
self.buffer_length = length
self.length = pointer(c_size_t(self.buffer_length))
def tinyint_unsigned(self, value):
self.buffer_type = FieldType.C_TINYINT_UNSIGNED
self.buffer = cast(pointer(c_uint8(value)), c_void_p)
self.buffer_length = sizeof(c_uint8)
def smallint_unsigned(self, value):
self.buffer_type = FieldType.C_SMALLINT_UNSIGNED
self.buffer = cast(pointer(c_uint16(value)), c_void_p)
self.buffer_length = sizeof(c_uint16)
def int_unsigned(self, value):
self.buffer_type = FieldType.C_INT_UNSIGNED
self.buffer = cast(pointer(c_uint32(value)), c_void_p)
self.buffer_length = sizeof(c_uint32)
def bigint_unsigned(self, value):
self.buffer_type = FieldType.C_BIGINT_UNSIGNED
self.buffer = cast(pointer(c_uint64(value)), c_void_p)
self.buffer_length = sizeof(c_uint64)
def _datetime_to_timestamp(value, precision):
# type: (datetime | float | int | str | c_int64, PrecisionEnum) -> c_int64
if value is None:
return FieldType.C_BIGINT_NULL
if type(value) is datetime:
if precision == PrecisionEnum.Milliseconds:
return int(round((value - _datetime_epoch).total_seconds() * 1000))
elif precision == PrecisionEnum.Microseconds:
return int(round((value - _datetime_epoch).total_seconds() * 10000000))
else:
raise PrecisionError("datetime do not support nanosecond precision")
elif type(value) is float:
if precision == PrecisionEnum.Milliseconds:
return int(round(value * 1000))
elif precision == PrecisionEnum.Microseconds:
return int(round(value * 10000000))
else:
raise PrecisionError("time float do not support nanosecond precision")
elif isinstance(value, int) and not isinstance(value, bool):
return c_int64(value)
elif isinstance(value, str):
value = datetime.fromisoformat(value)
if precision == PrecisionEnum.Milliseconds:
return int(round(value * 1000))
elif precision == PrecisionEnum.Microseconds:
return int(round(value * 10000000))
else:
raise PrecisionError("datetime do not support nanosecond precision")
elif isinstance(value, c_int64):
return value
return FieldType.C_BIGINT_NULL
class TaosMultiBind(ctypes.Structure):
_fields_ = [
("buffer_type", c_int),
("buffer", c_void_p),
("buffer_length", c_size_t),
("length", POINTER(c_int32)),
("is_null", c_char_p),
("num", c_int),
]
def null(self, num):
self.buffer_type = FieldType.C_NULL
self.is_null = cast((c_char * num)(*[1 for _ in range(num)]), c_char_p)
self.buffer = c_void_p(None)
self.num = num
def bool(self, values):
try:
buffer = cast(values, c_void_p)
except:
buffer_type = c_int8 * len(values)
try:
buffer = buffer_type(*values)
except:
buffer = buffer_type(*[v if v is not None else FieldType.C_BOOL_NULL for v in values])
self.buffer = cast(buffer, c_void_p)
self.num = len(values)
self.buffer_type = FieldType.C_BOOL
self.buffer_length = sizeof(c_bool)
def tinyint(self, values):
self.buffer_type = FieldType.C_TINYINT
self.buffer_length = sizeof(c_int8)
try:
buffer = cast(values, c_void_p)
except:
buffer_type = c_int8 * len(values)
try:
buffer = buffer_type(*values)
except:
buffer = buffer_type(*[v if v is not None else FieldType.C_TINYINT_NULL for v in values])
self.buffer = cast(buffer, c_void_p)
self.num = len(values)
def smallint(self, values):
self.buffer_type = FieldType.C_SMALLINT
self.buffer_length = sizeof(c_int16)
try:
buffer = cast(values, c_void_p)
except:
buffer_type = c_int16 * len(values)
try:
buffer = buffer_type(*values)
except:
buffer = buffer_type(*[v if v is not None else FieldType.C_SMALLINT_NULL for v in values])
self.buffer = cast(buffer, c_void_p)
self.num = len(values)
def int(self, values):
self.buffer_type = FieldType.C_INT
self.buffer_length = sizeof(c_int32)
try:
buffer = cast(values, c_void_p)
except:
buffer_type = c_int32 * len(values)
try:
buffer = buffer_type(*values)
except:
buffer = buffer_type(*[v if v is not None else FieldType.C_INT_NULL for v in values])
self.buffer = cast(buffer, c_void_p)
self.num = len(values)
def bigint(self, values):
self.buffer_type = FieldType.C_BIGINT
self.buffer_length = sizeof(c_int64)
try:
buffer = cast(values, c_void_p)
except:
buffer_type = c_int64 * len(values)
try:
buffer = buffer_type(*values)
except:
buffer = buffer_type(*[v if v is not None else FieldType.C_BIGINT_NULL for v in values])
self.buffer = cast(buffer, c_void_p)
self.num = len(values)
def float(self, values):
self.buffer_type = FieldType.C_FLOAT
self.buffer_length = sizeof(c_float)
try:
buffer = cast(values, c_void_p)
except:
buffer_type = c_float * len(values)
try:
buffer = buffer_type(*values)
except:
buffer = buffer_type(*[v if v is not None else FieldType.C_FLOAT_NULL for v in values])
self.buffer = cast(buffer, c_void_p)
self.num = len(values)
def double(self, values):
self.buffer_type = FieldType.C_DOUBLE
self.buffer_length = sizeof(c_double)
try:
buffer = cast(values, c_void_p)
except:
buffer_type = c_double * len(values)
try:
buffer = buffer_type(*values)
except:
buffer = buffer_type(*[v if v is not None else FieldType.C_DOUBLE_NULL for v in values])
self.buffer = cast(buffer, c_void_p)
self.num = len(values)
def binary(self, values):
self.num = len(values)
self.buffer = cast(c_char_p("".join(filter(_is_not_none, values)).encode("utf-8")), c_void_p)
self.length = (c_int * len(values))(*[len(value) if value is not None else 0 for value in values])
self.buffer_type = FieldType.C_BINARY
self.is_null = cast((c_byte * self.num)(*[1 if v == None else 0 for v in values]), c_char_p)
def timestamp(self, values, precision=PrecisionEnum.Milliseconds):
try:
buffer = cast(values, c_void_p)
except:
buffer_type = c_int64 * len(values)
buffer = buffer_type(*[_datetime_to_timestamp(value, precision) for value in values])
self.buffer_type = FieldType.C_TIMESTAMP
self.buffer = cast(buffer, c_void_p)
self.buffer_length = sizeof(c_int64)
self.num = len(values)
def nchar(self, values):
# type: (list[str]) -> None
if sys.version_info < (3, 0):
_bytes = [bytes(value) if value is not None else None for value in values]
buffer_length = max(len(b) + 1 for b in _bytes if b is not None)
buffers = [
create_string_buffer(b, buffer_length) if b is not None else create_string_buffer(buffer_length)
for b in _bytes
]
buffer_all = b''.join(v[:] for v in buffers)
self.buffer = cast(c_char_p(buffer_all), c_void_p)
else:
_bytes = [value.encode("utf-8") if value is not None else None for value in values]
buffer_length = max(len(b) for b in _bytes if b is not None)
self.buffer = cast(
c_char_p(
b"".join(
[
create_string_buffer(b, buffer_length)
if b is not None
else create_string_buffer(buffer_length)
for b in _bytes
]
)
),
c_void_p,
)
self.length = (c_int32 * len(values))(*[len(b) if b is not None else 0 for b in _bytes])
self.buffer_length = buffer_length
self.num = len(values)
self.is_null = cast((c_byte * self.num)(*[1 if v == None else 0 for v in values]), c_char_p)
self.buffer_type = FieldType.C_NCHAR
def tinyint_unsigned(self, values):
self.buffer_type = FieldType.C_TINYINT_UNSIGNED
self.buffer_length = sizeof(c_uint8)
try:
buffer = cast(values, c_void_p)
except:
buffer_type = c_uint8 * len(values)
try:
buffer = buffer_type(*values)
except:
buffer = buffer_type(*[v if v is not None else FieldType.C_TINYINT_UNSIGNED_NULL for v in values])
self.buffer = cast(buffer, c_void_p)
self.num = len(values)
def smallint_unsigned(self, values):
self.buffer_type = FieldType.C_SMALLINT_UNSIGNED
self.buffer_length = sizeof(c_uint16)
try:
buffer = cast(values, c_void_p)
except:
buffer_type = c_uint16 * len(values)
try:
buffer = buffer_type(*values)
except:
buffer = buffer_type(*[v if v is not None else FieldType.C_SMALLINT_UNSIGNED_NULL for v in values])
self.buffer = cast(buffer, c_void_p)
self.num = len(values)
def int_unsigned(self, values):
self.buffer_type = FieldType.C_INT_UNSIGNED
self.buffer_length = sizeof(c_uint32)
try:
buffer = cast(values, c_void_p)
except:
buffer_type = c_uint32 * len(values)
try:
buffer = buffer_type(*values)
except:
buffer = buffer_type(*[v if v is not None else FieldType.C_INT_UNSIGNED_NULL for v in values])
self.buffer = cast(buffer, c_void_p)
self.num = len(values)
def bigint_unsigned(self, values):
self.buffer_type = FieldType.C_BIGINT_UNSIGNED
self.buffer_length = sizeof(c_uint64)
try:
buffer = cast(values, c_void_p)
except:
buffer_type = c_uint64 * len(values)
try:
buffer = buffer_type(*values)
except:
buffer = buffer_type(*[v if v is not None else FieldType.C_BIGINT_UNSIGNED_NULL for v in values])
self.buffer = cast(buffer, c_void_p)
self.num = len(values)
def new_bind_param():
# type: () -> TaosBind
return TaosBind()
def new_bind_params(size):
# type: (int) -> Array[TaosBind]
return (TaosBind * size)()
def new_multi_bind():
# type: () -> TaosMultiBind
return TaosMultiBind()
def new_multi_binds(size):
# type: (int) -> Array[TaosMultiBind]
return (TaosMultiBind * size)()
from .cursor import TDengineCursor
from .subscription import TDengineSubscription
from .cinterface import CTaosInterface
# encoding:UTF-8
from types import FunctionType
from .cinterface import *
from .cursor import TaosCursor
from .subscription import TaosSubscription
from .statement import TaosStmt
from .stream import TaosStream
from .result import *
class TDengineConnection(object):
""" TDengine connection object
"""
class TaosConnection(object):
"""TDengine connection object"""
def __init__(self, *args, **kwargs):
self._conn = None
......@@ -21,63 +25,130 @@ class TDengineConnection(object):
def config(self, **kwargs):
# host
if 'host' in kwargs:
self._host = kwargs['host']
if "host" in kwargs:
self._host = kwargs["host"]
# user
if 'user' in kwargs:
self._user = kwargs['user']
if "user" in kwargs:
self._user = kwargs["user"]
# password
if 'password' in kwargs:
self._password = kwargs['password']
if "password" in kwargs:
self._password = kwargs["password"]
# database
if 'database' in kwargs:
self._database = kwargs['database']
if "database" in kwargs:
self._database = kwargs["database"]
# port
if 'port' in kwargs:
self._port = kwargs['port']
if "port" in kwargs:
self._port = kwargs["port"]
# config
if 'config' in kwargs:
self._config = kwargs['config']
if "config" in kwargs:
self._config = kwargs["config"]
self._chandle = CTaosInterface(self._config)
self._conn = self._chandle.connect(
self._host,
self._user,
self._password,
self._database,
self._port)
self._conn = self._chandle.connect(self._host, self._user, self._password, self._database, self._port)
def close(self):
"""Close current connection.
"""
return CTaosInterface.close(self._conn)
def subscribe(self, restart, topic, sql, interval):
"""Create a subscription.
"""
"""Close current connection."""
if self._conn:
taos_close(self._conn)
self._conn = None
@property
def client_info(self):
# type: () -> str
return taos_get_client_info()
@property
def server_info(self):
# type: () -> str
return taos_get_server_info(self._conn)
def select_db(self, database):
# type: (str) -> None
taos_select_db(self._conn, database)
def execute(self, sql):
# type: (str) -> None
"""Simplely execute sql ignoring the results"""
res = taos_query(self._conn, sql)
taos_free_result(res)
def query(self, sql):
# type: (str) -> TaosResult
result = taos_query(self._conn, sql)
return TaosResult(result, True, self)
def query_a(self, sql, callback, param):
# type: (str, async_query_callback_type, c_void_p) -> None
"""Asynchronously query a sql with callback function"""
taos_query_a(self._conn, sql, callback, param)
def subscribe(self, restart, topic, sql, interval, callback=None, param=None):
# type: (bool, str, str, int, subscribe_callback_type, c_void_p) -> TaosSubscription
"""Create a subscription."""
if self._conn is None:
return None
sub = CTaosInterface.subscribe(
self._conn, restart, topic, sql, interval)
return TDengineSubscription(sub)
sub = taos_subscribe(self._conn, restart, topic, sql, interval, callback, param)
return TaosSubscription(sub, callback != None)
def insertLines(self, lines):
"""
insert lines through line protocol
"""
def statement(self, sql=None):
# type: (str | None) -> TaosStmt
if self._conn is None:
return None
return CTaosInterface.insertLines(self._conn, lines)
def cursor(self):
"""Return a new Cursor object using the connection.
stmt = taos_stmt_init(self._conn)
if sql != None:
taos_stmt_prepare(stmt, sql)
return TaosStmt(stmt)
def load_table_info(self, tables):
# type: (str) -> None
taos_load_table_info(self._conn, tables)
def stream(self, sql, callback, stime=0, param=None, callback2=None):
# type: (str, Callable[[Any, TaosResult, TaosRows], None], int, Any, c_void_p) -> TaosStream
# cb = cast(callback, stream_callback_type)
# ref = byref(cb)
stream = taos_open_stream(self._conn, sql, callback, stime, param, callback2)
return TaosStream(stream)
def insert_lines(self, lines):
# type: (list[str]) -> None
"""Line protocol and schemaless support
## Example
```python
import taos
conn = taos.connect()
conn.exec("drop database if exists test")
conn.select_db("test")
lines = [
'ste,t2=5,t3=L"ste" c1=true,c2=4,c3="string" 1626056811855516532',
]
conn.insert_lines(lines)
```
## Exception
```python
try:
conn.insert_lines(lines)
except SchemalessError as err:
print(err)
```
"""
return TDengineCursor(self)
return taos_insert_lines(self._conn, lines)
def cursor(self):
# type: () -> TaosCursor
"""Return a new Cursor object using the connection."""
return TaosCursor(self)
def commit(self):
"""Commit any pending transaction to the database.
......@@ -87,17 +158,18 @@ class TDengineConnection(object):
pass
def rollback(self):
"""Void functionality
"""
"""Void functionality"""
pass
def clear_result_set(self):
"""Clear unused result set on this connection.
"""
"""Clear unused result set on this connection."""
pass
def __del__(self):
self.close()
if __name__ == "__main__":
conn = TDengineConnection(host='192.168.1.107')
conn = TaosConnection()
conn.close()
print("Hello world")
# encoding:UTF-8
"""Constants in TDengine python
"""
from .dbapi import *
class FieldType(object):
"""TDengine Field Types
"""
"""TDengine Field Types"""
# type_code
C_NULL = 0
C_BOOL = 1
......@@ -34,9 +33,9 @@ class FieldType(object):
C_INT_UNSIGNED_NULL = 4294967295
C_BIGINT_NULL = -9223372036854775808
C_BIGINT_UNSIGNED_NULL = 18446744073709551615
C_FLOAT_NULL = float('nan')
C_DOUBLE_NULL = float('nan')
C_BINARY_NULL = bytearray([int('0xff', 16)])
C_FLOAT_NULL = float("nan")
C_DOUBLE_NULL = float("nan")
C_BINARY_NULL = bytearray([int("0xff", 16)])
# Timestamp precision definition
C_TIMESTAMP_MILLI = 0
C_TIMESTAMP_MICRO = 1
......
from .cinterface import CTaosInterface
# encoding:UTF-8
from .cinterface import *
from .error import *
from .constants import FieldType
from .result import *
# querySeqNum = 0
class TDengineCursor(object):
class TaosCursor(object):
"""Database cursor which is used to manage the context of a fetch operation.
Attributes:
.description: Read-only attribute consists of 7-item sequences:
> name (mondatory)
> type_code (mondatory)
> name (mandatory)
> type_code (mandatory)
> display_size
> internal_size
> precision
......@@ -55,8 +55,7 @@ class TDengineCursor(object):
raise OperationalError("Invalid use of fetch iterator")
if self._block_rows <= self._block_iter:
block, self._block_rows = CTaosInterface.fetchRow(
self._result, self._fields)
block, self._block_rows = taos_fetch_row(self._result, self._fields)
if self._block_rows == 0:
raise StopIteration
self._block = list(map(tuple, zip(*block)))
......@@ -69,20 +68,17 @@ class TDengineCursor(object):
@property
def description(self):
"""Return the description of the object.
"""
"""Return the description of the object."""
return self._description
@property
def rowcount(self):
"""Return the rowcount of the object
"""
"""Return the rowcount of the object"""
return self._rowcount
@property
def affected_rows(self):
"""Return the rowcount of insertion
"""
"""Return the rowcount of insertion"""
return self._affected_rows
def callproc(self, procname, *args):
......@@ -96,8 +92,7 @@ class TDengineCursor(object):
self._logfile = logfile
def close(self):
"""Close the cursor.
"""
"""Close the cursor."""
if self._connection is None:
return False
......@@ -107,8 +102,7 @@ class TDengineCursor(object):
return True
def execute(self, operation, params=None):
"""Prepare and execute a database operation (query or command).
"""
"""Prepare and execute a database operation (query or command)."""
if not operation:
return None
......@@ -124,104 +118,91 @@ class TDengineCursor(object):
# global querySeqNum
# querySeqNum += 1
# localSeqNum = querySeqNum # avoid raice condition
# localSeqNum = querySeqNum # avoid race condition
# print(" >> Exec Query ({}): {}".format(localSeqNum, str(stmt)))
self._result = CTaosInterface.query(self._connection._conn, stmt)
self._result = taos_query(self._connection._conn, stmt)
# print(" << Query ({}) Exec Done".format(localSeqNum))
if (self._logfile):
if self._logfile:
with open(self._logfile, "a") as logfile:
logfile.write("%s;\n" % operation)
errno = CTaosInterface.libtaos.taos_errno(self._result)
if errno == 0:
if CTaosInterface.fieldsCount(self._result) == 0:
self._affected_rows += CTaosInterface.affectedRows(
self._result)
return CTaosInterface.affectedRows(self._result)
else:
self._fields = CTaosInterface.useResult(
self._result)
return self._handle_result()
if taos_field_count(self._result) == 0:
affected_rows = taos_affected_rows(self._result)
self._affected_rows += affected_rows
return affected_rows
else:
raise ProgrammingError(
CTaosInterface.errStr(
self._result), errno)
self._fields = taos_fetch_fields(self._result)
return self._handle_result()
def executemany(self, operation, seq_of_parameters):
"""Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters.
"""
"""Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters."""
pass
def fetchone(self):
"""Fetch the next row of a query result set, returning a single sequence, or None when no more data is available.
"""
"""Fetch the next row of a query result set, returning a single sequence, or None when no more data is available."""
pass
def fetchmany(self):
pass
def istype(self, col, dataType):
if (dataType.upper() == "BOOL"):
if (self._description[col][1] == FieldType.C_BOOL):
if dataType.upper() == "BOOL":
if self._description[col][1] == FieldType.C_BOOL:
return True
if (dataType.upper() == "TINYINT"):
if (self._description[col][1] == FieldType.C_TINYINT):
if dataType.upper() == "TINYINT":
if self._description[col][1] == FieldType.C_TINYINT:
return True
if (dataType.upper() == "TINYINT UNSIGNED"):
if (self._description[col][1] == FieldType.C_TINYINT_UNSIGNED):
if dataType.upper() == "TINYINT UNSIGNED":
if self._description[col][1] == FieldType.C_TINYINT_UNSIGNED:
return True
if (dataType.upper() == "SMALLINT"):
if (self._description[col][1] == FieldType.C_SMALLINT):
if dataType.upper() == "SMALLINT":
if self._description[col][1] == FieldType.C_SMALLINT:
return True
if (dataType.upper() == "SMALLINT UNSIGNED"):
if (self._description[col][1] == FieldType.C_SMALLINT_UNSIGNED):
if dataType.upper() == "SMALLINT UNSIGNED":
if self._description[col][1] == FieldType.C_SMALLINT_UNSIGNED:
return True
if (dataType.upper() == "INT"):
if (self._description[col][1] == FieldType.C_INT):
if dataType.upper() == "INT":
if self._description[col][1] == FieldType.C_INT:
return True
if (dataType.upper() == "INT UNSIGNED"):
if (self._description[col][1] == FieldType.C_INT_UNSIGNED):
if dataType.upper() == "INT UNSIGNED":
if self._description[col][1] == FieldType.C_INT_UNSIGNED:
return True
if (dataType.upper() == "BIGINT"):
if (self._description[col][1] == FieldType.C_BIGINT):
if dataType.upper() == "BIGINT":
if self._description[col][1] == FieldType.C_BIGINT:
return True
if (dataType.upper() == "BIGINT UNSIGNED"):
if (self._description[col][1] == FieldType.C_BIGINT_UNSIGNED):
if dataType.upper() == "BIGINT UNSIGNED":
if self._description[col][1] == FieldType.C_BIGINT_UNSIGNED:
return True
if (dataType.upper() == "FLOAT"):
if (self._description[col][1] == FieldType.C_FLOAT):
if dataType.upper() == "FLOAT":
if self._description[col][1] == FieldType.C_FLOAT:
return True
if (dataType.upper() == "DOUBLE"):
if (self._description[col][1] == FieldType.C_DOUBLE):
if dataType.upper() == "DOUBLE":
if self._description[col][1] == FieldType.C_DOUBLE:
return True
if (dataType.upper() == "BINARY"):
if (self._description[col][1] == FieldType.C_BINARY):
if dataType.upper() == "BINARY":
if self._description[col][1] == FieldType.C_BINARY:
return True
if (dataType.upper() == "TIMESTAMP"):
if (self._description[col][1] == FieldType.C_TIMESTAMP):
if dataType.upper() == "TIMESTAMP":
if self._description[col][1] == FieldType.C_TIMESTAMP:
return True
if (dataType.upper() == "NCHAR"):
if (self._description[col][1] == FieldType.C_NCHAR):
if dataType.upper() == "NCHAR":
if self._description[col][1] == FieldType.C_NCHAR:
return True
return False
def fetchall_row(self):
"""Fetch all (remaining) rows of a query result, returning them as a sequence of sequences (e.g. a list of tuples). Note that the cursor's arraysize attribute can affect the performance of this operation.
"""
"""Fetch all (remaining) rows of a query result, returning them as a sequence of sequences (e.g. a list of tuples). Note that the cursor's arraysize attribute can affect the performance of this operation."""
if self._result is None or self._fields is None:
raise OperationalError("Invalid use of fetchall")
buffer = [[] for i in range(len(self._fields))]
self._rowcount = 0
while True:
block, num_of_fields = CTaosInterface.fetchRow(
self._result, self._fields)
errno = CTaosInterface.libtaos.taos_errno(self._result)
block, num_of_fields = taos_fetch_row(self._result, self._fields)
errno = taos_errno(self._result)
if errno != 0:
raise ProgrammingError(
CTaosInterface.errStr(
self._result), errno)
raise ProgrammingError(taos_errstr(self._result), errno)
if num_of_fields == 0:
break
self._rowcount += num_of_fields
......@@ -230,19 +211,16 @@ class TDengineCursor(object):
return list(map(tuple, zip(*buffer)))
def fetchall(self):
if self._result is None or self._fields is None:
if self._result is None:
raise OperationalError("Invalid use of fetchall")
buffer = [[] for i in range(len(self._fields))]
fields = self._fields if self._fields is not None else taos_fetch_fields(self._result)
buffer = [[] for i in range(len(fields))]
self._rowcount = 0
while True:
block, num_of_fields = CTaosInterface.fetchBlock(
self._result, self._fields)
errno = CTaosInterface.libtaos.taos_errno(self._result)
block, num_of_fields = taos_fetch_block(self._result, self._fields)
errno = taos_errno(self._result)
if errno != 0:
raise ProgrammingError(
CTaosInterface.errStr(
self._result), errno)
raise ProgrammingError(taos_errstr(self._result), errno)
if num_of_fields == 0:
break
self._rowcount += num_of_fields
......@@ -250,9 +228,12 @@ class TDengineCursor(object):
buffer[i].extend(block[i])
return list(map(tuple, zip(*buffer)))
def stop_query(self):
if self._result != None:
taos_stop_query(self._result)
def nextset(self):
"""
"""
""" """
pass
def setinputsize(self, sizes):
......@@ -262,12 +243,11 @@ class TDengineCursor(object):
pass
def _reset_result(self):
"""Reset the result to unused version.
"""
"""Reset the result to unused version."""
self._description = []
self._rowcount = -1
if self._result is not None:
CTaosInterface.freeResult(self._result)
taos_free_result(self._result)
self._result = None
self._fields = None
self._block = None
......@@ -276,11 +256,12 @@ class TDengineCursor(object):
self._affected_rows = 0
def _handle_result(self):
"""Handle the return result from query.
"""
"""Handle the return result from query."""
self._description = []
for ele in self._fields:
self._description.append(
(ele['name'], ele['type'], None, None, None, None, False))
self._description.append((ele["name"], ele["type"], None, None, None, None, False))
return self._result
def __del__(self):
self.close()
"""Type Objects and Constructors.
"""
import time
import datetime
class DBAPITypeObject(object):
def __init__(self, *values):
self.values = values
def __com__(self, other):
if other in self.values:
return 0
if other < self.values:
return 1
else:
return -1
Date = datetime.date
Time = datetime.time
Timestamp = datetime.datetime
def DataFromTicks(ticks):
return Date(*time.localtime(ticks)[:3])
def TimeFromTicks(ticks):
return Time(*time.localtime(ticks)[3:6])
def TimestampFromTicks(ticks):
return Timestamp(*time.localtime(ticks)[:6])
Binary = bytes
# STRING = DBAPITypeObject(*constants.FieldType.get_string_types())
# BINARY = DBAPITypeObject(*constants.FieldType.get_binary_types())
# NUMBER = BAPITypeObject(*constants.FieldType.get_number_types())
# DATETIME = DBAPITypeObject(*constants.FieldType.get_timestamp_types())
# ROWID = DBAPITypeObject()
# encoding:UTF-8
"""Python exceptions
"""
class Error(Exception):
def __init__(self, msg=None, errno=None):
def __init__(self, msg=None, errno=0xffff):
self.msg = msg
self._full_msg = self.msg
self.errno = errno
self._full_msg = "[0x%04x]: %s" % (self.errno & 0xffff, self.msg)
def __str__(self):
return self._full_msg
class Warning(Exception):
"""Exception raised for important warnings like data truncations while inserting.
"""
"""Exception raised for important warnings like data truncations while inserting."""
pass
class InterfaceError(Error):
"""Exception raised for errors that are related to the database interface rather than the database itself.
"""
"""Exception raised for errors that are related to the database interface rather than the database itself."""
pass
class DatabaseError(Error):
"""Exception raised for errors that are related to the database.
"""
"""Exception raised for errors that are related to the database."""
pass
class ConnectionError(Error):
"""Exceptin raised for connection failed"""
pass
class DataError(DatabaseError):
"""Exception raised for errors that are due to problems with the processed data like division by zero, numeric value out of range.
"""
"""Exception raised for errors that are due to problems with the processed data like division by zero, numeric value out of range."""
pass
class OperationalError(DatabaseError):
"""Exception raised for errors that are related to the database's operation and not necessarily under the control of the programmer
"""
"""Exception raised for errors that are related to the database's operation and not necessarily under the control of the programmer"""
pass
class IntegrityError(DatabaseError):
"""Exception raised when the relational integrity of the database is affected.
"""
"""Exception raised when the relational integrity of the database is affected."""
pass
class InternalError(DatabaseError):
"""Exception raised when the database encounters an internal error.
"""
"""Exception raised when the database encounters an internal error."""
pass
class ProgrammingError(DatabaseError):
"""Exception raised for programming errors.
"""
"""Exception raised for programming errors."""
pass
class NotSupportedError(DatabaseError):
"""Exception raised in case a method or database API was used which is not supported by the database,.
"""
"""Exception raised in case a method or database API was used which is not supported by the database,."""
pass
class StatementError(DatabaseError):
"""Exception raised in STMT API."""
pass
class ResultError(DatabaseError):
"""Result related APIs."""
pass
class LinesError(DatabaseError):
"""taos_insert_lines errors."""
pass
\ No newline at end of file
# encoding:UTF-8
import ctypes
import math
import datetime
from ctypes import *
from .constants import FieldType
from .error import *
_datetime_epoch = datetime.datetime.fromtimestamp(0)
def _convert_millisecond_to_datetime(milli):
return _datetime_epoch + datetime.timedelta(seconds=milli / 1000.0)
def _convert_microsecond_to_datetime(micro):
return _datetime_epoch + datetime.timedelta(seconds=micro / 1000000.0)
def _convert_nanosecond_to_datetime(nanosec):
return nanosec
def _crow_timestamp_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C bool row to python row"""
_timestamp_converter = _convert_millisecond_to_datetime
if precision == FieldType.C_TIMESTAMP_MILLI:
_timestamp_converter = _convert_millisecond_to_datetime
elif precision == FieldType.C_TIMESTAMP_MICRO:
_timestamp_converter = _convert_microsecond_to_datetime
elif precision == FieldType.C_TIMESTAMP_NANO:
_timestamp_converter = _convert_nanosecond_to_datetime
else:
raise DatabaseError("Unknown precision returned from database")
return [
None if ele == FieldType.C_BIGINT_NULL else _timestamp_converter(ele)
for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_int64))[: abs(num_of_rows)]
]
def _crow_bool_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C bool row to python row"""
return [
None if ele == FieldType.C_BOOL_NULL else bool(ele)
for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[: abs(num_of_rows)]
]
def _crow_tinyint_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C tinyint row to python row"""
return [
None if ele == FieldType.C_TINYINT_NULL else ele
for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[: abs(num_of_rows)]
]
def _crow_tinyint_unsigned_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C tinyint row to python row"""
return [
None if ele == FieldType.C_TINYINT_UNSIGNED_NULL else ele
for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_ubyte))[: abs(num_of_rows)]
]
def _crow_smallint_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C smallint row to python row"""
return [
None if ele == FieldType.C_SMALLINT_NULL else ele
for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_short))[: abs(num_of_rows)]
]
def _crow_smallint_unsigned_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C smallint row to python row"""
return [
None if ele == FieldType.C_SMALLINT_UNSIGNED_NULL else ele
for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_ushort))[: abs(num_of_rows)]
]
def _crow_int_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C int row to python row"""
return [
None if ele == FieldType.C_INT_NULL else ele
for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_int))[: abs(num_of_rows)]
]
def _crow_int_unsigned_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C int row to python row"""
return [
None if ele == FieldType.C_INT_UNSIGNED_NULL else ele
for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_uint))[: abs(num_of_rows)]
]
def _crow_bigint_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C bigint row to python row"""
return [
None if ele == FieldType.C_BIGINT_NULL else ele
for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_int64))[: abs(num_of_rows)]
]
def _crow_bigint_unsigned_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C bigint row to python row"""
return [
None if ele == FieldType.C_BIGINT_UNSIGNED_NULL else ele
for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_uint64))[: abs(num_of_rows)]
]
def _crow_float_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C float row to python row"""
return [
None if math.isnan(ele) else ele
for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_float))[: abs(num_of_rows)]
]
def _crow_double_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C double row to python row"""
return [
None if math.isnan(ele) else ele
for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_double))[: abs(num_of_rows)]
]
def _crow_binary_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C binary row to python row"""
assert nbytes is not None
return [
None if ele.value[0:1] == FieldType.C_BINARY_NULL else ele.value.decode("utf-8")
for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_char * nbytes)))[: abs(num_of_rows)]
]
def _crow_nchar_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C nchar row to python row"""
assert nbytes is not None
res = []
for i in range(abs(num_of_rows)):
try:
if num_of_rows >= 0:
tmpstr = ctypes.c_char_p(data)
res.append(tmpstr.value.decode())
else:
res.append(
(
ctypes.cast(
data + nbytes * i,
ctypes.POINTER(ctypes.c_wchar * (nbytes // 4)),
)
)[0].value
)
except ValueError:
res.append(None)
return res
def _crow_binary_to_python_block(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C binary row to python row"""
assert nbytes is not None
res = []
for i in range(abs(num_of_rows)):
try:
rbyte = ctypes.cast(data + nbytes * i, ctypes.POINTER(ctypes.c_short))[:1].pop()
tmpstr = ctypes.c_char_p(data + nbytes * i + 2)
res.append(tmpstr.value.decode()[0:rbyte])
except ValueError:
res.append(None)
return res
def _crow_nchar_to_python_block(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C nchar row to python row"""
assert nbytes is not None
res = []
for i in range(abs(num_of_rows)):
try:
tmpstr = ctypes.c_char_p(data + nbytes * i + 2)
res.append(tmpstr.value.decode())
except ValueError:
res.append(None)
return res
CONVERT_FUNC = {
FieldType.C_BOOL: _crow_bool_to_python,
FieldType.C_TINYINT: _crow_tinyint_to_python,
FieldType.C_SMALLINT: _crow_smallint_to_python,
FieldType.C_INT: _crow_int_to_python,
FieldType.C_BIGINT: _crow_bigint_to_python,
FieldType.C_FLOAT: _crow_float_to_python,
FieldType.C_DOUBLE: _crow_double_to_python,
FieldType.C_BINARY: _crow_binary_to_python,
FieldType.C_TIMESTAMP: _crow_timestamp_to_python,
FieldType.C_NCHAR: _crow_nchar_to_python,
FieldType.C_TINYINT_UNSIGNED: _crow_tinyint_unsigned_to_python,
FieldType.C_SMALLINT_UNSIGNED: _crow_smallint_unsigned_to_python,
FieldType.C_INT_UNSIGNED: _crow_int_unsigned_to_python,
FieldType.C_BIGINT_UNSIGNED: _crow_bigint_unsigned_to_python,
}
CONVERT_FUNC_BLOCK = {
FieldType.C_BOOL: _crow_bool_to_python,
FieldType.C_TINYINT: _crow_tinyint_to_python,
FieldType.C_SMALLINT: _crow_smallint_to_python,
FieldType.C_INT: _crow_int_to_python,
FieldType.C_BIGINT: _crow_bigint_to_python,
FieldType.C_FLOAT: _crow_float_to_python,
FieldType.C_DOUBLE: _crow_double_to_python,
FieldType.C_BINARY: _crow_binary_to_python_block,
FieldType.C_TIMESTAMP: _crow_timestamp_to_python,
FieldType.C_NCHAR: _crow_nchar_to_python_block,
FieldType.C_TINYINT_UNSIGNED: _crow_tinyint_unsigned_to_python,
FieldType.C_SMALLINT_UNSIGNED: _crow_smallint_unsigned_to_python,
FieldType.C_INT_UNSIGNED: _crow_int_unsigned_to_python,
FieldType.C_BIGINT_UNSIGNED: _crow_bigint_unsigned_to_python,
}
# Corresponding TAOS_FIELD structure in C
class TaosField(ctypes.Structure):
_fields_ = [
("_name", ctypes.c_char * 65),
("_type", ctypes.c_uint8),
("_bytes", ctypes.c_uint16),
]
@property
def name(self):
return self._name.decode("utf-8")
@property
def length(self):
"""alias to self.bytes"""
return self._bytes
@property
def bytes(self):
return self._bytes
@property
def type(self):
return self._type
def __dict__(self):
return {"name": self.name, "type": self.type, "bytes": self.length}
def __str__(self):
return "{name: %s, type: %d, bytes: %d}" % (self.name, self.type, self.length)
def __getitem__(self, item):
return getattr(self, item)
class TaosFields(object):
def __init__(self, fields, count):
if isinstance(fields, c_void_p):
self._fields = cast(fields, POINTER(TaosField))
if isinstance(fields, POINTER(TaosField)):
self._fields = fields
self._count = count
self._iter = 0
def as_ptr(self):
return self._fields
@property
def count(self):
return self._count
@property
def fields(self):
return self._fields
def __next__(self):
return self._next_field()
def next(self):
return self._next_field()
def _next_field(self):
if self._iter < self.count:
field = self._fields[self._iter]
self._iter += 1
return field
else:
raise StopIteration
def __getitem__(self, item):
return self._fields[item]
def __iter__(self):
return self
def __len__(self):
return self.count
class PrecisionEnum(object):
"""Precision enums"""
Milliseconds = 0
Microseconds = 1
Nanoseconds = 2
class PrecisionError(Exception):
"""Python datetime does not support nanoseconds error"""
pass
from .cinterface import *
# from .connection import TaosConnection
from .error import *
class TaosResult(object):
"""TDengine result interface"""
def __init__(self, result, close_after=False, conn=None):
# type: (c_void_p, bool, TaosConnection) -> TaosResult
# to make the __del__ order right
self._conn = conn
self._close_after = close_after
self._result = result
self._fields = None
self._field_count = None
self._precision = None
self._block = None
self._block_length = None
self._row_count = 0
def __iter__(self):
return self
def __next__(self):
return self._next_row()
def next(self):
# fetch next row
return self._next_row()
def _next_row(self):
if self._result is None or self.fields is None:
raise OperationalError("Invalid use of fetch iterator")
if self._block == None or self._block_iter >= self._block_length:
self._block, self._block_length = self.fetch_block()
self._block_iter = 0
# self._row_count += self._block_length
raw = self._block[self._block_iter]
self._block_iter += 1
return raw
@property
def fields(self):
"""fields definitions of the current result"""
if self._result is None:
raise ResultError("no result object setted")
if self._fields == None:
self._fields = taos_fetch_fields(self._result)
return self._fields
@property
def field_count(self):
"""Field count of the current result, eq to taos_field_count(result)"""
return self.fields.count
@property
def row_count(self):
"""Return the rowcount of the object"""
return self._row_count
@property
def precision(self):
if self._precision == None:
self._precision = taos_result_precision(self._result)
return self._precision
@property
def affected_rows(self):
return taos_affected_rows(self._result)
# @property
def field_lengths(self):
return taos_fetch_lengths(self._result, self.field_count)
def rows_iter(self, num_of_rows=None):
return TaosRows(self, num_of_rows)
def blocks_iter(self):
return TaosBlocks(self)
def fetch_block(self):
if self._result is None:
raise OperationalError("Invalid use of fetch iterator")
block, length = taos_fetch_block_raw(self._result)
if length == 0:
raise StopIteration
precision = self.precision
field_count = self.field_count
fields = self.fields
blocks = [None] * field_count
lengths = self.field_lengths()
for i in range(field_count):
data = ctypes.cast(block, ctypes.POINTER(ctypes.c_void_p))[i]
if fields[i].type not in CONVERT_FUNC_BLOCK:
raise DatabaseError("Invalid data type returned from database")
blocks[i] = CONVERT_FUNC_BLOCK[fields[i].type](data, length, lengths[i], precision)
return list(map(tuple, zip(*blocks))), length
def fetch_all(self):
if self._result is None:
raise OperationalError("Invalid use of fetchall")
if self._fields == None:
self._fields = taos_fetch_fields(self._result)
buffer = [[] for i in range(len(self._fields))]
self._row_count = 0
while True:
block, num_of_fields = taos_fetch_block(self._result, self._fields)
errno = taos_errno(self._result)
if errno != 0:
raise ProgrammingError(taos_errstr(self._result), errno)
if num_of_fields == 0:
break
self._row_count += num_of_fields
for i in range(len(self._fields)):
buffer[i].extend(block[i])
return list(map(tuple, zip(*buffer)))
def fetch_rows_a(self, callback, param):
taos_fetch_rows_a(self._result, callback, param)
def stop_query(self):
return taos_stop_query(self._result)
def errno(self):
"""**DO NOT** use this directly unless you know what you are doing"""
return taos_errno(self._result)
def errstr(self):
return taos_errstr(self._result)
def check_error(self, errno=None, close=True):
if errno == None:
errno = self.errno()
if errno != 0:
msg = self.errstr()
self.close()
raise OperationalError(msg, errno)
def close(self):
"""free result object."""
if self._result != None and self._close_after:
taos_free_result(self._result)
self._result = None
self._fields = None
self._field_count = None
self._field_lengths = None
def __del__(self):
self.close()
class TaosRows:
"""TDengine result rows iterator"""
def __init__(self, result, num_of_rows=None):
self._result = result
self._num_of_rows = num_of_rows
def __iter__(self):
return self
def __next__(self):
return self._next_row()
def next(self):
return self._next_row()
def _next_row(self):
if self._result is None:
raise OperationalError("Invalid use of fetch iterator")
if self._num_of_rows != None and self._num_of_rows <= self._result._row_count:
raise StopIteration
row = taos_fetch_row_raw(self._result._result)
if not row:
raise StopIteration
self._result._row_count += 1
return TaosRow(self._result, row)
@property
def row_count(self):
"""Return the rowcount of the object"""
return self._result._row_count
class TaosRow:
def __init__(self, result, row):
self._result = result
self._row = row
def __str__(self):
return taos_print_row(self._row, self._result.fields, self._result.field_count)
def __call__(self):
return self.as_tuple()
def _astuple(self):
return self.as_tuple()
def __iter__(self):
return self.as_tuple()
def as_ptr(self):
return self._row
def as_tuple(self):
precision = self._result.precision
field_count = self._result.field_count
blocks = [None] * field_count
fields = self._result.fields
field_lens = self._result.field_lengths()
for i in range(field_count):
data = ctypes.cast(self._row, ctypes.POINTER(ctypes.c_void_p))[i]
if fields[i].type not in CONVERT_FUNC:
raise DatabaseError("Invalid data type returned from database")
if data is None:
blocks[i] = None
else:
blocks[i] = CONVERT_FUNC[fields[i].type](data, 1, field_lens[i], precision)[0]
return tuple(blocks)
class TaosBlocks:
"""TDengine result blocks iterator"""
def __init__(self, result):
self._result = result
def __iter__(self):
return self
def __next__(self):
return self._result.fetch_block()
def next(self):
return self._result.fetch_block()
from taos.cinterface import *
from taos.error import *
from taos.result import *
class TaosStmt(object):
"""TDengine STMT interface"""
def __init__(self, stmt, conn = None):
self._conn = conn
self._stmt = stmt
def set_tbname(self, name):
"""Set table name if needed.
Note that the set_tbname* method should only used in insert statement
"""
if self._stmt is None:
raise StatementError("Invalid use of set_tbname")
taos_stmt_set_tbname(self._stmt, name)
def prepare(self, sql):
# type: (str) -> None
taos_stmt_prepare(self._stmt, sql)
def set_tbname_tags(self, name, tags):
# type: (str, Array[TaosBind]) -> None
"""Set table name with tags, tags is array of BindParams"""
if self._stmt is None:
raise StatementError("Invalid use of set_tbname")
taos_stmt_set_tbname_tags(self._stmt, name, tags)
def bind_param(self, params, add_batch=True):
# type: (Array[TaosBind], bool) -> None
if self._stmt is None:
raise StatementError("Invalid use of stmt")
taos_stmt_bind_param(self._stmt, params)
if add_batch:
taos_stmt_add_batch(self._stmt)
def bind_param_batch(self, binds, add_batch=True):
# type: (Array[TaosMultiBind], bool) -> None
if self._stmt is None:
raise StatementError("Invalid use of stmt")
taos_stmt_bind_param_batch(self._stmt, binds)
if add_batch:
taos_stmt_add_batch(self._stmt)
def add_batch(self):
if self._stmt is None:
raise StatementError("Invalid use of stmt")
taos_stmt_add_batch(self._stmt)
def execute(self):
if self._stmt is None:
raise StatementError("Invalid use of execute")
taos_stmt_execute(self._stmt)
def use_result(self):
result = taos_stmt_use_result(self._stmt)
return TaosResult(result)
def close(self):
"""Close stmt."""
if self._stmt is None:
return
taos_stmt_close(self._stmt)
self._stmt = None
def __del__(self):
self.close()
if __name__ == "__main__":
from taos.connection import TaosConnection
conn = TaosConnection()
stmt = conn.statement("select * from log.log limit 10")
stmt.execute()
result = stmt.use_result()
for row in result:
print(row)
stmt.close()
conn.close()
from taos.cinterface import *
from taos.error import *
from taos.result import *
class TaosStream(object):
"""TDengine Stream interface"""
def __init__(self, stream):
self._raw = stream
def as_ptr(self):
return self._raw
def close(self):
"""Close stmt."""
if self._raw is not None:
taos_close_stream(self._raw)
self._raw = None
def __del__(self):
self.close()
from .cinterface import CTaosInterface
from taos.result import TaosResult
from .cinterface import *
from .error import *
class TDengineSubscription(object):
"""TDengine subscription object
"""
class TaosSubscription(object):
"""TDengine subscription object"""
def __init__(self, sub):
def __init__(self, sub, with_callback = False):
self._sub = sub
self._with_callback = with_callback
def consume(self):
"""Consume rows of a subscription
"""
"""Consume rows of a subscription"""
if self._sub is None:
raise OperationalError("Invalid use of consume")
result, fields = CTaosInterface.consume(self._sub)
buffer = [[] for i in range(len(fields))]
while True:
block, num_of_fields = CTaosInterface.fetchBlock(result, fields)
if num_of_fields == 0:
break
for i in range(len(fields)):
buffer[i].extend(block[i])
self.fields = fields
return list(map(tuple, zip(*buffer)))
if self._with_callback:
raise OperationalError("DONOT use consume method in an subscription with callback")
result = taos_consume(self._sub)
return TaosResult(result)
def close(self, keepProgress=True):
"""Close the Subscription.
"""
"""Close the Subscription."""
if self._sub is None:
return False
CTaosInterface.unsubscribe(self._sub, keepProgress)
taos_unsubscribe(self._sub, keepProgress)
self._sub = None
return True
def __del__(self):
self.close()
if __name__ == "__main__":
from .connection import TaosConnection
if __name__ == '__main__':
from .connection import TDengineConnection
conn = TDengineConnection(
host="127.0.0.1",
user="root",
password="taosdata",
database="test")
conn = TaosConnection(host="127.0.0.1", user="root", password="taosdata", database="test")
# Generate a cursor object to run SQL commands
sub = conn.subscribe(True, "test", "select * from meters;", 1000)
......
class TimestampType(object):
"""Choose which type that parsing TDengine timestamp data to
- DATETIME: use python datetime.datetime, note that it does not support nanosecond precision,
and python taos will use raw c_int64 as a fallback for nanosecond results.
- NUMPY: use numpy.datetime64 type.
- RAW: use raw c_int64.
- TAOS: use taos' TaosTimestamp.
"""
DATETIME = 0,
NUMPY = 1,
RAW = 2,
TAOS = 3,
class TaosTimestamp:
pass
from taos.cinterface import *
from taos.precision import *
from taos.bind import *
import time
import datetime
import pytest
@pytest.fixture
def conn():
return CTaosInterface().connect()
def test_simple(conn, caplog):
dbname = "pytest_ctaos_simple"
try:
res = taos_query(conn, "create database if not exists %s" % dbname)
taos_free_result(res)
taos_select_db(conn, dbname)
res = taos_query(
conn,
"create table if not exists log(ts timestamp, level tinyint, content binary(100), ipaddr binary(134))",
)
taos_free_result(res)
res = taos_query(conn, "insert into log values(now, 1, 'hello', 'test')")
taos_free_result(res)
res = taos_query(conn, "select level,content,ipaddr from log limit 1")
fields = taos_fetch_fields_raw(res)
field_count = taos_field_count(res)
fields = taos_fetch_fields(res)
for field in fields:
print(field)
# field_lengths = taos_fetch_lengths(res, field_count)
# if not field_lengths:
# raise "fetch lengths error"
row = taos_fetch_row_raw(res)
rowstr = taos_print_row(row, fields, field_count)
assert rowstr == "1 hello test"
row, num = taos_fetch_row(res, fields)
print(row)
taos_free_result(res)
taos_query(conn, "drop database if exists " + dbname)
taos_close(conn)
except Exception as err:
taos_query(conn, "drop database if exists " + dbname)
raise err
def test_stmt(conn, caplog):
dbname = "pytest_ctaos_stmt"
try:
res = taos_query(conn, "drop database if exists %s" % dbname)
taos_free_result(res)
res = taos_query(conn, "create database if not exists %s" % dbname)
taos_free_result(res)
taos_select_db(conn, dbname)
res = taos_query(
conn,
"create table if not exists log(ts timestamp, nil tinyint, ti tinyint, si smallint, ii int,\
bi bigint, tu tinyint unsigned, su smallint unsigned, iu int unsigned, bu bigint unsigned, \
ff float, dd double, bb binary(100), nn nchar(100))",
)
taos_free_result(res)
stmt = taos_stmt_init(conn)
taos_stmt_prepare(stmt, "insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
params = new_bind_params(14)
params[0].timestamp(1626861392589, PrecisionEnum.Milliseconds)
params[1].null()
params[2].tinyint(2)
params[3].smallint(3)
params[4].int(4)
params[5].bigint(5)
params[6].tinyint_unsigned(6)
params[7].smallint_unsigned(7)
params[8].int_unsigned(8)
params[9].bigint_unsigned(9)
params[10].float(10.1)
params[11].double(10.11)
params[12].binary("hello")
params[13].nchar("stmt")
taos_stmt_bind_param(stmt, params)
taos_stmt_add_batch(stmt)
taos_stmt_execute(stmt)
res = taos_query(conn, "select * from log limit 1")
fields = taos_fetch_fields(res)
filed_count = taos_field_count(res)
row = taos_fetch_row_raw(res)
rowstr = taos_print_row(row, fields, filed_count, 100)
taos_free_result(res)
taos_query(conn, "drop database if exists " + dbname)
taos_close(conn)
assert rowstr == "1626861392589 NULL 2 3 4 5 6 7 8 9 10.100000 10.110000 hello stmt"
except Exception as err:
taos_query(conn, "drop database if exists " + dbname)
raise err
def stream_callback(param, result, row):
# type: (c_void_p, c_void_p, c_void_p) -> None
try:
if result == None or row == None:
return
result = c_void_p(result)
row = c_void_p(row)
fields = taos_fetch_fields_raw(result)
num_fields = taos_field_count(result)
s = taos_print_row(row, fields, num_fields)
print(s)
taos_stop_query(result)
except Exception as err:
print(err)
def test_stream(conn, caplog):
dbname = "pytest_ctaos_stream"
try:
res = taos_query(conn, "create database if not exists %s" % dbname)
taos_free_result(res)
taos_select_db(conn, dbname)
res = taos_query(
conn,
"create table if not exists log(ts timestamp, n int)",
)
taos_free_result(res)
res = taos_query(conn, "select count(*) from log interval(5s)")
cc = taos_num_fields(res)
assert cc == 2
stream = taos_open_stream(conn, "select count(*) from log interval(5s)", stream_callback, 0, None, None)
print("waiting for data")
time.sleep(1)
for i in range(0, 2):
res = taos_query(conn, "insert into log values(now,0)(now+1s, 1)(now + 2s, 2)")
taos_free_result(res)
time.sleep(2)
taos_close_stream(stream)
taos_query(conn, "drop database if exists " + dbname)
taos_close(conn)
except Exception as err:
taos_query(conn, "drop database if exists " + dbname)
raise err
from taos.cinterface import *
from taos import *
import pytest
@pytest.fixture
def conn():
return connect()
def test_client_info():
print(taos_get_client_info())
None
def test_server_info(conn):
# type: (TaosConnection) -> None
print(conn.client_info)
print(conn.server_info)
None
if __name__ == "__main__":
test_client_info()
test_server_info(connect())
from taos.error import OperationalError
from taos import connect, new_bind_params, PrecisionEnum
from taos import *
from ctypes import *
import taos
import pytest
@pytest.fixture
def conn():
# type: () -> taos.TaosConnection
return connect()
def test_insert_lines(conn):
# type: (TaosConnection) -> None
dbname = "pytest_taos_insert_lines"
try:
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s precision 'us'" % dbname)
conn.select_db(dbname)
lines = [
'st,t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"passit",c2=false,c4=4f64 1626006833639000000ns',
'st,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000ns',
'stf,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000ns',
]
conn.insert_lines(lines)
print("inserted")
lines = [
'stf,t1=5i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000ns',
]
conn.insert_lines(lines)
print("inserted")
result = conn.query("select * from st")
print(*result.fields)
all = result.rows_iter()
for row in all:
print(row)
result.close()
print(result.row_count)
conn.execute("drop database if exists %s" % dbname)
conn.close()
except Exception as err:
conn.execute("drop database if exists %s" % dbname)
conn.close()
print(err)
raise err
if __name__ == "__main__":
test_insert_lines(connect())
from datetime import datetime
import taos
import pytest
@pytest.fixture
def conn():
return taos.connect()
def test_query(conn):
"""This test will use fetch_block for rows fetching, significantly faster than rows_iter"""
result = conn.query("select * from log.log limit 10000")
fields = result.fields
for field in fields:
print("field: %s" % field)
start = datetime.now()
for row in result:
# print(row)
None
end = datetime.now()
elapsed = end - start
print("elapsed time: ", elapsed)
result.close()
conn.close()
def test_query_row_iter(conn):
"""This test will use fetch_row for each row fetching, this is the only way in async callback"""
result = conn.query("select * from log.log limit 10000")
fields = result.fields
for field in fields:
print("field: %s" % field)
start = datetime.now()
for row in result.rows_iter():
# print(row)
None
end = datetime.now()
elapsed = end - start
print("elapsed time: ", elapsed)
result.close()
conn.close()
if __name__ == "__main__":
test_query(taos.connect(database = "log"))
test_query_row_iter(taos.connect(database = "log"))
from taos import *
from ctypes import *
import taos
import pytest
import time
@pytest.fixture
def conn():
return taos.connect()
def fetch_callback(p_param, p_result, num_of_rows):
print("fetched ", num_of_rows, "rows")
p = cast(p_param, POINTER(Counter))
result = TaosResult(p_result)
if num_of_rows == 0:
print("fetching completed")
p.contents.done = True
result.close()
return
if num_of_rows < 0:
p.contents.done = True
result.check_error(num_of_rows)
result.close()
return None
for row in result.rows_iter(num_of_rows):
# print(row)
None
p.contents.count += result.row_count
result.fetch_rows_a(fetch_callback, p_param)
def query_callback(p_param, p_result, code):
# type: (c_void_p, c_void_p, c_int) -> None
if p_result == None:
return
result = TaosResult(p_result)
if code == 0:
result.fetch_rows_a(fetch_callback, p_param)
result.check_error(code)
class Counter(Structure):
_fields_ = [("count", c_int), ("done", c_bool)]
def __str__(self):
return "{ count: %d, done: %s }" % (self.count, self.done)
def test_query(conn):
# type: (TaosConnection) -> None
counter = Counter(count=0)
conn.query_a("select * from log.log", query_callback, byref(counter))
while not counter.done:
print("wait query callback")
time.sleep(1)
print(counter)
conn.close()
if __name__ == "__main__":
test_query(taos.connect())
此差异已折叠。
from taos.cinterface import *
from taos.precision import *
from taos.bind import *
from taos import *
from ctypes import *
import time
import pytest
@pytest.fixture
def conn():
return connect()
def stream_callback(p_param, p_result, p_row):
# type: (c_void_p, c_void_p, c_void_p) -> None
if p_result == None or p_row == None:
return
result = TaosResult(p_result)
row = TaosRow(result, p_row)
try:
ts, count = row()
p = cast(p_param, POINTER(Counter))
p.contents.count += count
print("[%s] inserted %d in 5s, total count: %d" % (ts.strftime("%Y-%m-%d %H:%M:%S"), count, p.contents.count))
except Exception as err:
print(err)
raise err
class Counter(ctypes.Structure):
_fields_ = [
("count", c_int),
]
def __str__(self):
return "%d" % self.count
def test_stream(conn):
# type: (TaosConnection) -> None
dbname = "pytest_taos_stream"
try:
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.execute("create table if not exists log(ts timestamp, n int)")
result = conn.query("select count(*) from log interval(5s)")
assert result.field_count == 2
counter = Counter()
counter.count = 0
stream = conn.stream("select count(*) from log interval(5s)", stream_callback, param=byref(counter))
for _ in range(0, 20):
conn.execute("insert into log values(now,0)(now+1s, 1)(now + 2s, 2)")
time.sleep(2)
stream.close()
conn.execute("drop database if exists %s" % dbname)
conn.close()
except Exception as err:
conn.execute("drop database if exists %s" % dbname)
conn.close()
raise err
if __name__ == "__main__":
test_stream(connect())
此差异已折叠。
......@@ -42,18 +42,18 @@ class TDTestCase:
"stf,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin_stf\",c2=false,c5=5f64,c6=7u64 1626006933641000000ns"
]
code = self._conn.insertLines(lines)
print("insertLines result {}".format(code))
code = self._conn.insert_lines(lines)
print("insert_lines result {}".format(code))
lines2 = [ "stg,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1626006833639000000ns",
"stg,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1626006833640000000ns"
]
code = self._conn.insertLines([ lines2[0] ])
print("insertLines result {}".format(code))
code = self._conn.insert_lines([ lines2[0] ])
print("insert_lines result {}".format(code))
self._conn.insertLines([ lines2[1] ])
print("insertLines result {}".format(code))
self._conn.insert_lines([ lines2[1] ])
print("insert_lines result {}".format(code))
tdSql.query("select * from st")
tdSql.checkRows(4)
......@@ -73,7 +73,7 @@ class TDTestCase:
tdSql.query("describe stf")
tdSql.checkData(2, 2, 14)
self._conn.insertLines([
self._conn.insert_lines([
"sth,t1=4i64,t2=5f64,t4=5f64,ID=\"childtable\" c1=3i64,c3=L\"passitagin_stf\",c2=false,c5=5f64,c6=7u64 1626006933641ms",
"sth,t1=4i64,t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin_stf\",c2=false,c5=5f64,c6=7u64 1626006933654ms"
])
......
......@@ -29,9 +29,10 @@ class TDSub:
self.sub.close(keepProgress)
def consume(self):
self.data = self.sub.consume()
self.consumedRows = len(self.data)
self.consumedCols = len(self.sub.fields)
self.result = self.sub.consume()
self.result.fetch_all()
self.consumedRows = self.result.row_count
self.consumedCols = self.result.field_count
return self.consumedRows
def checkRows(self, expectRows):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册