未验证 提交 0f1fefdb 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

[TD-14358]<fix>: remove python connector from TDengine (#11066)

* [TD-14358]<fix>: remove python connector from TDengine

* [TD-14358]<fix>: use standalone python connector

* add python clone in TDinternal

* git clean before checkout
上级 40bd5691
...@@ -116,7 +116,7 @@ def pre_test(){ ...@@ -116,7 +116,7 @@ def pre_test(){
make > /dev/null make > /dev/null
make install > /dev/null make install > /dev/null
cd ${WKC}/tests cd ${WKC}/tests
pip3 install ${WKC}/src/connector/python/ pip3 install taospy
''' '''
return 1 return 1
} }
......
...@@ -36,6 +36,7 @@ def sync_source() { ...@@ -36,6 +36,7 @@ def sync_source() {
} else { } else {
sh ''' sh '''
cd ${WKC} cd ${WKC}
git clean -fxd
git checkout develop git checkout develop
''' '''
} }
...@@ -83,6 +84,16 @@ def sync_source() { ...@@ -83,6 +84,16 @@ def sync_source() {
cd ${WKC} cd ${WKC}
git fetch origin +refs/pull/${CHANGE_ID}/merge git fetch origin +refs/pull/${CHANGE_ID}/merge
git checkout -qf FETCH_HEAD git checkout -qf FETCH_HEAD
if [ ! -d src/connector/python/.github ]; then
rm -rf src/connector/python/* || :
rm -rf src/connector/python/.* || :
git clone --depth 1 https://github.com/taosdata/taos-connector-python src/connector/python || echo "failed to clone python connector"
else
cd src/connector/python || echo "src/connector/python not exist"
git pull || :
cd ${WKC}
fi
''' '''
} else if (env.CHANGE_URL =~ /\/TDinternal\//) { } else if (env.CHANGE_URL =~ /\/TDinternal\//) {
sh ''' sh '''
...@@ -90,6 +101,16 @@ def sync_source() { ...@@ -90,6 +101,16 @@ def sync_source() {
cd ${WK} cd ${WK}
git fetch origin +refs/pull/${CHANGE_ID}/merge git fetch origin +refs/pull/${CHANGE_ID}/merge
git checkout -qf FETCH_HEAD git checkout -qf FETCH_HEAD
if [ ! -d community/src/connector/python/.github ]; then
rm -rf community/src/connector/python/* || :
rm -rf community/src/connector/python/.* || :
git clone --depth 1 https://github.com/taosdata/taos-connector-python community/src/connector/python || echo "failed to clone python connector"
else
cd community/src/connector/python || echo "community/src/connector/python not exist"
git pull || :
cd ${WK}
fi
''' '''
} else { } else {
sh ''' sh '''
......
Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
This program is free software: you can use, redistribute, and/or modify
it under the terms of the GNU Affero General Public License, version 3
or later ("AGPL"), as published by the Free Software Foundation.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
...@@ -17,12 +17,6 @@ Or with git url: ...@@ -17,12 +17,6 @@ Or with git url:
pip install git+https://github.com/taosdata/taos-connector-python.git pip install git+https://github.com/taosdata/taos-connector-python.git
``` ```
If you have installed TDengine server or client with prebuilt packages, then you can install the connector from path:
```bash
pip install /usr/local/taos/connector/python
```
## Source Code ## Source Code
[TDengine](https://github.com/taosdata/TDengine) connector for Python source code is hosted on [GitHub](https://github.com/taosdata/taos-connector-python). [TDengine](https://github.com/taosdata/TDengine) connector for Python source code is hosted on [GitHub](https://github.com/taosdata/taos-connector-python).
...@@ -361,71 +355,6 @@ if __name__ == "__main__": ...@@ -361,71 +355,6 @@ if __name__ == "__main__":
test_subscribe_callback(connect()) test_subscribe_callback(connect())
``` ```
### Stream
```python
from taos import *
from ctypes import *
import time
def stream_callback(p_param, p_result, p_row):
# type: (c_void_p, c_void_p, c_void_p) -> None
if p_result is None or p_row is None:
return
result = TaosResult(p_result)
row = TaosRow(result, p_row)
try:
ts, count = row()
p = cast(p_param, POINTER(Counter))
p.contents.count += count
print("[%s] inserted %d in 5s, total count: %d" % (ts.strftime("%Y-%m-%d %H:%M:%S"), count, p.contents.count))
except Exception as err:
print(err)
raise err
class Counter(Structure):
_fields_ = [
("count", c_int),
]
def __str__(self):
return "%d" % self.count
def test_stream(conn):
# type: (TaosConnection) -> None
dbname = "pytest_taos_stream"
try:
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.execute("create table if not exists log(ts timestamp, n int)")
result = conn.query("select count(*) from log interval(5s)")
assert result.field_count == 2
counter = Counter()
counter.count = 0
stream = conn.stream("select count(*) from log interval(5s)", stream_callback, param=byref(counter))
for _ in range(0, 20):
conn.execute("insert into log values(now,0)(now+1s, 1)(now + 2s, 2)")
time.sleep(2)
stream.close()
conn.execute("drop database if exists %s" % dbname)
conn.close()
except Exception as err:
conn.execute("drop database if exists %s" % dbname)
conn.close()
raise err
if __name__ == "__main__":
test_stream(connect())
```
### Insert with line protocol ### Insert with line protocol
```python ```python
......
# encoding:UTF-8
from taos import *
conn = connect()
dbname = "pytest_taos_stmt_multi"
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.execute(
"create table if not exists log(ts timestamp, bo bool, nil tinyint, \
ti tinyint, si smallint, ii int, bi bigint, tu tinyint unsigned, \
su smallint unsigned, iu int unsigned, bu bigint unsigned, \
ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)",
)
stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
params = new_multi_binds(16)
params[0].timestamp((1626861392589, 1626861392590, 1626861392591))
params[1].bool((True, None, False))
params[2].tinyint([-128, -128, None]) # -128 is tinyint null
params[3].tinyint([0, 127, None])
params[4].smallint([3, None, 2])
params[5].int([3, 4, None])
params[6].bigint([3, 4, None])
params[7].tinyint_unsigned([3, 4, None])
params[8].smallint_unsigned([3, 4, None])
params[9].int_unsigned([3, 4, None])
params[10].bigint_unsigned([3, 4, None])
params[11].float([3, None, 1])
params[12].double([3, None, 1.2])
params[13].binary(["abc", "dddafadfadfadfadfa", None])
params[14].nchar(["涛思数据", None, "a long string with 中文字符"])
params[15].timestamp([None, None, 1626861392591])
stmt.bind_param_batch(params)
stmt.execute()
result = stmt.use_result()
assert result.affected_rows == 3
result.close()
result = conn.query("select * from log")
for row in result:
print(row)
result.close()
stmt.close()
conn.close()
\ No newline at end of file
from taos import *
conn = connect()
dbname = "pytest_taos_stmt"
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.execute(
"create table if not exists log(ts timestamp, bo bool, nil tinyint, \
ti tinyint, si smallint, ii int, bi bigint, tu tinyint unsigned, \
su smallint unsigned, iu int unsigned, bu bigint unsigned, \
ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)",
)
stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
params = new_bind_params(16)
params[0].timestamp(1626861392589)
params[1].bool(True)
params[2].null()
params[3].tinyint(2)
params[4].smallint(3)
params[5].int(4)
params[6].bigint(5)
params[7].tinyint_unsigned(6)
params[8].smallint_unsigned(7)
params[9].int_unsigned(8)
params[10].bigint_unsigned(9)
params[11].float(10.1)
params[12].double(10.11)
params[13].binary("hello")
params[14].nchar("stmt")
params[15].timestamp(1626861392589)
stmt.bind_param(params)
params[0].timestamp(1626861392590)
params[15].null()
stmt.bind_param(params)
stmt.execute()
result = stmt.use_result()
assert result.affected_rows == 2
# No need to explicitly close, but ok for you
# result.close()
result = conn.query("select * from log")
for row in result:
print(row)
# No need to explicitly close, but ok for you
# result.close()
# stmt.close()
# conn.close()
import taos
conn = taos.connect(host='127.0.0.1',
user='root',
password='taosdata',
database='log')
cursor = conn.cursor()
sql = "select * from log.log limit 10"
cursor.execute(sql)
for row in cursor:
print(row)
import taos
from taos import SmlProtocol, SmlPrecision
conn = taos.connect()
dbname = "pytest_line"
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s precision 'us'" % dbname)
conn.select_db(dbname)
lines = [
'st,t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"pass",c2=false,c4=4f64 1626006833639000000',
]
conn.schemaless_insert(lines, taos.SmlProtocol.LINE_PROTOCOL, taos.SmlPrecision.NOT_CONFIGURED)
print("inserted")
conn.schemaless_insert(lines, taos.SmlProtocol.LINE_PROTOCOL, taos.SmlPrecision.NOT_CONFIGURED)
result = conn.query("show tables")
for row in result:
print(row)
conn.execute("drop database if exists %s" % dbname)
import taos
conn = taos.connect()
cursor = conn.cursor()
cursor.execute("show databases")
results = cursor.fetchall()
for row in results:
print(row)
from taos import *
from ctypes import *
import time
def fetch_callback(p_param, p_result, num_of_rows):
print("fetched ", num_of_rows, "rows")
p = cast(p_param, POINTER(Counter))
result = TaosResult(p_result)
if num_of_rows == 0:
print("fetching completed")
p.contents.done = True
# should explicitly close the result in fetch completed or cause error
result.close()
return
if num_of_rows < 0:
p.contents.done = True
result.check_error(num_of_rows)
result.close()
return None
for row in result.rows_iter(num_of_rows):
# print(row)
None
p.contents.count += result.row_count
result.fetch_rows_a(fetch_callback, p_param)
def query_callback(p_param, p_result, code):
# type: (c_void_p, c_void_p, c_int) -> None
if p_result is None:
return
result = TaosResult(p_result)
if code == 0:
result.fetch_rows_a(fetch_callback, p_param)
result.check_error(code)
# explicitly close result while query failed
result.close()
class Counter(Structure):
_fields_ = [("count", c_int), ("done", c_bool)]
def __str__(self):
return "{ count: %d, done: %s }" % (self.count, self.done)
def test_query(conn):
# type: (TaosConnection) -> None
counter = Counter(count=0)
conn.query_a("select * from log.log", query_callback, byref(counter))
while not counter.done:
print("wait query callback")
time.sleep(1)
print(counter)
# conn.close()
if __name__ == "__main__":
test_query(connect())
\ No newline at end of file
import taos
conn = taos.connect()
conn.execute("create database if not exists pytest")
result = conn.query("show databases")
num_of_fields = result.field_count
for field in result.fields:
print(field)
for row in result:
print(row)
conn.execute("drop database pytest")
from taos import *
from ctypes import *
import time
def stream_callback(p_param, p_result, p_row):
# type: (c_void_p, c_void_p, c_void_p) -> None
if p_result is None or p_row is None:
return
result = TaosResult(p_result)
row = TaosRow(result, p_row)
try:
ts, count = row()
p = cast(p_param, POINTER(Counter))
p.contents.count += count
print("[%s] inserted %d in 5s, total count: %d" % (ts.strftime("%Y-%m-%d %H:%M:%S"), count, p.contents.count))
except Exception as err:
print(err)
raise err
class Counter(Structure):
_fields_ = [
("count", c_int),
]
def __str__(self):
return "%d" % self.count
def test_stream(conn):
# type: (TaosConnection) -> None
dbname = "pytest_taos_stream"
try:
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.execute("create table if not exists log(ts timestamp, n int)")
result = conn.query("select count(*) from log interval(5s)")
assert result.field_count == 2
counter = Counter()
counter.count = 0
stream = conn.stream("select count(*) from log interval(5s)", stream_callback, param=byref(counter))
for _ in range(0, 20):
conn.execute("insert into log values(now,0)(now+1s, 1)(now + 2s, 2)")
time.sleep(2)
stream.close()
conn.execute("drop database if exists %s" % dbname)
conn.close()
except Exception as err:
conn.execute("drop database if exists %s" % dbname)
conn.close()
raise err
if __name__ == "__main__":
test_stream(connect())
from taos import *
from ctypes import *
import time
def subscribe_callback(p_sub, p_result, p_param, errno):
# type: (c_void_p, c_void_p, c_void_p, c_int) -> None
print("# fetch in callback")
result = TaosResult(c_void_p(p_result))
result.check_error(errno)
for row in result.rows_iter():
ts, n = row()
print(ts, n)
def test_subscribe_callback(conn):
# type: (TaosConnection) -> None
dbname = "pytest_taos_subscribe_callback"
try:
print("drop if exists")
conn.execute("drop database if exists %s" % dbname)
print("create database")
conn.execute("create database if not exists %s" % dbname)
print("create table")
# conn.execute("use %s" % dbname)
conn.execute("create table if not exists %s.log(ts timestamp, n int)" % dbname)
print("# subscribe with callback")
sub = conn.subscribe(False, "test", "select * from %s.log" % dbname, 1000, subscribe_callback)
for i in range(10):
conn.execute("insert into %s.log values(now, %d)" % (dbname, i))
time.sleep(0.7)
sub.close()
conn.execute("drop database if exists %s" % dbname)
# conn.close()
except Exception as err:
conn.execute("drop database if exists %s" % dbname)
# conn.close()
raise err
if __name__ == "__main__":
test_subscribe_callback(connect())
import taos
import random
conn = taos.connect()
dbname = "pytest_taos_subscribe"
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.execute("create table if not exists log(ts timestamp, n int)")
for i in range(10):
conn.execute("insert into log values(now, %d)" % i)
sub = conn.subscribe(False, "test", "select * from log", 1000)
print("# consume from begin")
for ts, n in sub.consume():
print(ts, n)
print("# consume new data")
for i in range(5):
conn.execute("insert into log values(now, %d)(now+1s, %d)" % (i, i))
result = sub.consume()
for ts, n in result:
print(ts, n)
sub.close(True)
print("# keep progress consume")
sub = conn.subscribe(False, "test", "select * from log", 1000)
result = sub.consume()
rows = result.fetch_all()
# consume from latest subscription needs root privilege(for /var/lib/taos).
assert result.row_count == 0
print("## consumed ", len(rows), "rows")
print("# consume with a stop condition")
for i in range(10):
conn.execute("insert into log values(now, %d)" % random.randint(0, 10))
result = sub.consume()
try:
ts, n = next(result)
print(ts, n)
if n > 5:
result.stop_query()
print("## stopped")
break
except StopIteration:
continue
sub.close()
# sub.close()
conn.execute("drop database if exists %s" % dbname)
# conn.close()
[tool.poetry]
name = "taospy"
version = "2.1.2"
description = "TDengine connector for python"
authors = ["Taosdata Inc. <support@taosdata.com>"]
license = "AGPL-3.0"
readme = "README.md"
packages = [
{include = "taos"}
]
[tool.poetry.dependencies]
python = "^2.7 || ^3.4"
typing = "*"
[tool.poetry.dev-dependencies]
pytest = [
{ version = "^4.6", python = ">=2.7,<3.0" },
{ version = "^6.2", python = ">=3.7,<4.0" }
]
pdoc = { version = "^7.1.1", python = "^3.7" }
mypy = { version = "^0.910", python = "^3.6" }
black = [{ version = "^21.*", python = ">=3.6.2,<4.0" }]
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
[tool.black]
line-length = 119
import setuptools
with open("README.md", "r") as fh:
long_description = fh.read()
setuptools.setup(
name="taos",
version="2.1.1",
author="Taosdata Inc.",
author_email="support@taosdata.com",
description="TDengine python client package",
long_description=long_description,
long_description_content_type="text/markdown",
url="https://github.com/taosdata/TDengine/tree/develop/src/connector/python",
packages=setuptools.find_packages(),
classifiers=[
"Environment :: Console",
"Environment :: MacOS X",
"Environment :: Win32 (MS Windows)",
"Programming Language :: Python",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)",
"Operating System :: MacOS",
"Programming Language :: Python :: 2.7",
"Operating System :: Linux",
"Operating System :: POSIX :: Linux",
"Operating System :: Microsoft :: Windows",
"Operating System :: Microsoft :: Windows :: Windows 10",
],
)
# encoding:UTF-8
"""
# TDengine Connector for Python
[TDengine](https://github.com/taosdata/TDengine) connector for Python enables python programs to access TDengine,
using an API which is compliant with the Python DB API 2.0 (PEP-249). It uses TDengine C client library for client server communications.
## Install
```sh
git clone --depth 1 https://github.com/taosdata/TDengine.git
pip install ./TDengine/src/connector/python
```
## Source Code
[TDengine](https://github.com/taosdata/TDengine) connector for Python source code is hosted on [GitHub](https://github.com/taosdata/TDengine/tree/develop/src/connector/python).
## Examples
### Query with PEP-249 API
```python
import taos
conn = taos.connect()
cursor = conn.cursor()
cursor.execute("show databases")
results = cursor.fetchall()
for row in results:
print(row)
cursor.close()
conn.close()
```
### Query with objective API
```python
import taos
conn = taos.connect()
conn.exec("create database if not exists pytest")
result = conn.query("show databases")
num_of_fields = result.field_count
for field in result.fields:
print(field)
for row in result:
print(row)
result.close()
conn.exec("drop database pytest")
conn.close()
```
### Query with async API
```python
from taos import *
from ctypes import *
import time
def fetch_callback(p_param, p_result, num_of_rows):
print("fetched ", num_of_rows, "rows")
p = cast(p_param, POINTER(Counter))
result = TaosResult(p_result)
if num_of_rows == 0:
print("fetching completed")
p.contents.done = True
result.close()
return
if num_of_rows < 0:
p.contents.done = True
result.check_error(num_of_rows)
result.close()
return None
for row in result.rows_iter(num_of_rows):
# print(row)
None
p.contents.count += result.row_count
result.fetch_rows_a(fetch_callback, p_param)
def query_callback(p_param, p_result, code):
# type: (c_void_p, c_void_p, c_int) -> None
if p_result is None:
return
result = TaosResult(p_result)
if code == 0:
result.fetch_rows_a(fetch_callback, p_param)
result.check_error(code)
class Counter(Structure):
_fields_ = [("count", c_int), ("done", c_bool)]
def __str__(self):
return "{ count: %d, done: %s }" % (self.count, self.done)
def test_query(conn):
# type: (TaosConnection) -> None
counter = Counter(count=0)
conn.query_a("select * from log.log", query_callback, byref(counter))
while not counter.done:
print("wait query callback")
time.sleep(1)
print(counter)
conn.close()
if __name__ == "__main__":
test_query(connect())
```
### Statement API - Bind row after row
```python
from taos import *
conn = connect()
dbname = "pytest_taos_stmt"
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.exec(
"create table if not exists log(ts timestamp, bo bool, nil tinyint, \\
ti tinyint, si smallint, ii int, bi bigint, tu tinyint unsigned, \\
su smallint unsigned, iu int unsigned, bu bigint unsigned, \\
ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)",
)
stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
params = new_bind_params(16)
params[0].timestamp(1626861392589)
params[1].bool(True)
params[2].null()
params[3].tinyint(2)
params[4].smallint(3)
params[5].int(4)
params[6].bigint(5)
params[7].tinyint_unsigned(6)
params[8].smallint_unsigned(7)
params[9].int_unsigned(8)
params[10].bigint_unsigned(9)
params[11].float(10.1)
params[12].double(10.11)
params[13].binary("hello")
params[14].nchar("stmt")
params[15].timestamp(1626861392589)
stmt.bind_param(params)
params[0].timestamp(1626861392590)
params[15].null()
stmt.bind_param(params)
stmt.execute()
result = stmt.use_result()
assert result.affected_rows == 2
result.close()
result = conn.query("select * from log")
for row in result:
print(row)
result.close()
stmt.close()
conn.close()
```
### Statement API - Bind multi rows
```python
from taos import *
conn = connect()
dbname = "pytest_taos_stmt"
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.exec(
"create table if not exists log(ts timestamp, bo bool, nil tinyint, \\
ti tinyint, si smallint, ii int, bi bigint, tu tinyint unsigned, \\
su smallint unsigned, iu int unsigned, bu bigint unsigned, \\
ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)",
)
stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
params = new_multi_binds(16)
params[0].timestamp((1626861392589, 1626861392590, 1626861392591))
params[1].bool((True, None, False))
params[2].tinyint([-128, -128, None]) # -128 is tinyint null
params[3].tinyint([0, 127, None])
params[4].smallint([3, None, 2])
params[5].int([3, 4, None])
params[6].bigint([3, 4, None])
params[7].tinyint_unsigned([3, 4, None])
params[8].smallint_unsigned([3, 4, None])
params[9].int_unsigned([3, 4, None])
params[10].bigint_unsigned([3, 4, None])
params[11].float([3, None, 1])
params[12].double([3, None, 1.2])
params[13].binary(["abc", "dddafadfadfadfadfa", None])
params[14].nchar(["涛思数据", None, "a long string with 中文字符"])
params[15].timestamp([None, None, 1626861392591])
stmt.bind_param_batch(params)
stmt.execute()
result = stmt.use_result()
assert result.affected_rows == 3
result.close()
result = conn.query("select * from log")
for row in result:
print(row)
result.close()
stmt.close()
conn.close()
```
### Statement API - Subscribe
```python
import taos
conn = taos.connect()
dbname = "pytest_taos_subscribe_callback"
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.exec("create table if not exists log(ts timestamp, n int)")
for i in range(10):
conn.exec("insert into log values(now, %d)" % i)
sub = conn.subscribe(True, "test", "select * from log", 1000)
print("# consume from begin")
for ts, n in sub.consume():
print(ts, n)
print("# consume new data")
for i in range(5):
conn.exec("insert into log values(now, %d)(now+1s, %d)" % (i, i))
result = sub.consume()
for ts, n in result:
print(ts, n)
print("# consume with a stop condition")
for i in range(10):
conn.exec("insert into log values(now, %d)" % int(random() * 10))
result = sub.consume()
try:
ts, n = next(result)
print(ts, n)
if n > 5:
result.stop_query()
print("## stopped")
break
except StopIteration:
continue
sub.close()
conn.exec("drop database if exists %s" % dbname)
conn.close()
```
### Statement API - Subscribe asynchronously with callback
```python
from taos import *
from ctypes import *
import time
def subscribe_callback(p_sub, p_result, p_param, errno):
# type: (c_void_p, c_void_p, c_void_p, c_int) -> None
print("# fetch in callback")
result = TaosResult(p_result)
result.check_error(errno)
for row in result.rows_iter():
ts, n = row()
print(ts, n)
def test_subscribe_callback(conn):
# type: (TaosConnection) -> None
dbname = "pytest_taos_subscribe_callback"
try:
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.exec("create table if not exists log(ts timestamp, n int)")
print("# subscribe with callback")
sub = conn.subscribe(False, "test", "select * from log", 1000, subscribe_callback)
for i in range(10):
conn.exec("insert into log values(now, %d)" % i)
time.sleep(0.7)
sub.close()
conn.exec("drop database if exists %s" % dbname)
conn.close()
except Exception as err:
conn.exec("drop database if exists %s" % dbname)
conn.close()
raise err
if __name__ == "__main__":
test_subscribe_callback(connect())
```
### Statement API - Stream
```python
from taos import *
from ctypes import *
def stream_callback(p_param, p_result, p_row):
# type: (c_void_p, c_void_p, c_void_p) -> None
if p_result is None or p_row is None:
return
result = TaosResult(p_result)
row = TaosRow(result, p_row)
try:
ts, count = row()
p = cast(p_param, POINTER(Counter))
p.contents.count += count
print("[%s] inserted %d in 5s, total count: %d" % (ts.strftime("%Y-%m-%d %H:%M:%S"), count, p.contents.count))
except Exception as err:
print(err)
raise err
class Counter(ctypes.Structure):
_fields_ = [
("count", c_int),
]
def __str__(self):
return "%d" % self.count
def test_stream(conn):
# type: (TaosConnection) -> None
dbname = "pytest_taos_stream"
try:
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.exec("create table if not exists log(ts timestamp, n int)")
result = conn.query("select count(*) from log interval(5s)")
assert result.field_count == 2
counter = Counter()
counter.count = 0
stream = conn.stream("select count(*) from log interval(5s)", stream_callback, param=byref(counter))
for _ in range(0, 20):
conn.exec("insert into log values(now,0)(now+1s, 1)(now + 2s, 2)")
time.sleep(2)
stream.close()
conn.exec("drop database if exists %s" % dbname)
conn.close()
except Exception as err:
conn.exec("drop database if exists %s" % dbname)
conn.close()
raise err
if __name__ == "__main__":
test_stream(connect())
```
### Insert with line protocol
```python
import taos
conn = taos.connect()
dbname = "pytest_line"
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s precision 'us'" % dbname)
conn.select_db(dbname)
lines = [
'st,t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"passit",c2=false,c4=4f64 1626006833639000000',
'st,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000',
'stf,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000',
]
conn.schemaless_insert(lines, 0, "ns")
print("inserted")
lines = [
'stf,t1=5i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000',
]
conn.schemaless_insert(lines, 0, "ns")
result = conn.query("show tables")
for row in result:
print(row)
result.close()
conn.exec("drop database if exists %s" % dbname)
conn.close()
```
## License - AGPL-3.0
Keep same with [TDengine](https://github.com/taosdata/TDengine).
"""
from .connection import TaosConnection
# For some reason, the following is needed for VS Code (through PyLance) to
# recognize that "error" is a valid module of the "taos" package.
from .error import *
from .bind import *
from .field import *
from .cursor import *
from .result import *
from .statement import *
from .subscription import *
from .schemaless import *
from taos._version import __version__
# Globals
threadsafety = 0
paramstyle = "pyformat"
__all__ = [
"__version__",
# functions
"connect",
"new_bind_param",
"new_bind_params",
"new_multi_binds",
"new_multi_bind",
# objects
"TaosBind",
"TaosConnection",
"TaosCursor",
"TaosResult",
"TaosRows",
"TaosRow",
"TaosStmt",
"PrecisionEnum",
"SmlPrecision",
"SmlProtocol"
]
def connect(*args, **kwargs):
# type: (..., ...) -> TaosConnection
"""Function to return a TDengine connector object
Current supporting keyword parameters:
@dsn: Data source name as string
@user: Username as string(optional)
@password: Password as string(optional)
@host: Hostname(optional)
@database: Database name(optional)
@rtype: TDengineConnector
"""
return TaosConnection(*args, **kwargs)
# encoding:UTF-8
import ctypes
from .constants import FieldType
from .error import *
from .precision import *
from datetime import datetime
from ctypes import *
import sys
_datetime_epoch = datetime.utcfromtimestamp(0)
def _is_not_none(obj):
return obj != None
class TaosBind(ctypes.Structure):
_fields_ = [
("buffer_type", c_int),
("buffer", c_void_p),
("buffer_length", c_size_t),
("length", POINTER(c_size_t)),
("is_null", POINTER(c_int)),
("is_unsigned", c_int),
("error", POINTER(c_int)),
("u", c_int64),
("allocated", c_int),
]
def null(self):
self.buffer_type = FieldType.C_NULL
self.is_null = pointer(c_int(1))
def bool(self, value):
self.buffer_type = FieldType.C_BOOL
self.buffer = cast(pointer(c_bool(value)), c_void_p)
self.buffer_length = sizeof(c_bool)
def tinyint(self, value):
self.buffer_type = FieldType.C_TINYINT
self.buffer = cast(pointer(c_int8(value)), c_void_p)
self.buffer_length = sizeof(c_int8)
def smallint(self, value):
self.buffer_type = FieldType.C_SMALLINT
self.buffer = cast(pointer(c_int16(value)), c_void_p)
self.buffer_length = sizeof(c_int16)
def int(self, value):
self.buffer_type = FieldType.C_INT
self.buffer = cast(pointer(c_int32(value)), c_void_p)
self.buffer_length = sizeof(c_int32)
def bigint(self, value):
self.buffer_type = FieldType.C_BIGINT
self.buffer = cast(pointer(c_int64(value)), c_void_p)
self.buffer_length = sizeof(c_int64)
def float(self, value):
self.buffer_type = FieldType.C_FLOAT
self.buffer = cast(pointer(c_float(value)), c_void_p)
self.buffer_length = sizeof(c_float)
def double(self, value):
self.buffer_type = FieldType.C_DOUBLE
self.buffer = cast(pointer(c_double(value)), c_void_p)
self.buffer_length = sizeof(c_double)
def binary(self, value):
buffer = None
length = 0
if isinstance(value, str):
bytes = value.encode("utf-8")
buffer = create_string_buffer(bytes)
length = len(bytes)
else:
buffer = value
length = len(value)
self.buffer_type = FieldType.C_BINARY
self.buffer = cast(buffer, c_void_p)
self.buffer_length = length
self.length = pointer(c_size_t(self.buffer_length))
def timestamp(self, value, precision=PrecisionEnum.Milliseconds):
if type(value) is datetime:
if precision == PrecisionEnum.Milliseconds:
ts = int(round((value - _datetime_epoch).total_seconds() * 1000))
elif precision == PrecisionEnum.Microseconds:
ts = int(round((value - _datetime_epoch).total_seconds() * 10000000))
else:
raise PrecisionError("datetime do not support nanosecond precision")
elif type(value) is float:
if precision == PrecisionEnum.Milliseconds:
ts = int(round(value * 1000))
elif precision == PrecisionEnum.Microseconds:
ts = int(round(value * 10000000))
else:
raise PrecisionError("time float do not support nanosecond precision")
elif isinstance(value, int) and not isinstance(value, bool):
ts = value
elif isinstance(value, str):
value = datetime.fromisoformat(value)
if precision == PrecisionEnum.Milliseconds:
ts = int(round(value * 1000))
elif precision == PrecisionEnum.Microseconds:
ts = int(round(value * 10000000))
else:
raise PrecisionError("datetime do not support nanosecond precision")
self.buffer_type = FieldType.C_TIMESTAMP
self.buffer = cast(pointer(c_int64(ts)), c_void_p)
self.buffer_length = sizeof(c_int64)
def nchar(self, value):
buffer = None
length = 0
if isinstance(value, str):
bytes = value.encode("utf-8")
buffer = create_string_buffer(bytes)
length = len(bytes)
else:
buffer = value
length = len(value)
self.buffer_type = FieldType.C_NCHAR
self.buffer = cast(buffer, c_void_p)
self.buffer_length = length
self.length = pointer(c_size_t(self.buffer_length))
def json(self, value):
buffer = None
length = 0
if isinstance(value, str):
bytes = value.encode("utf-8")
buffer = create_string_buffer(bytes)
length = len(bytes)
else:
buffer = value
length = len(value)
self.buffer_type = FieldType.C_JSON
self.buffer = cast(buffer, c_void_p)
self.buffer_length = length
self.length = pointer(c_size_t(self.buffer_length))
def tinyint_unsigned(self, value):
self.buffer_type = FieldType.C_TINYINT_UNSIGNED
self.buffer = cast(pointer(c_uint8(value)), c_void_p)
self.buffer_length = sizeof(c_uint8)
def smallint_unsigned(self, value):
self.buffer_type = FieldType.C_SMALLINT_UNSIGNED
self.buffer = cast(pointer(c_uint16(value)), c_void_p)
self.buffer_length = sizeof(c_uint16)
def int_unsigned(self, value):
self.buffer_type = FieldType.C_INT_UNSIGNED
self.buffer = cast(pointer(c_uint32(value)), c_void_p)
self.buffer_length = sizeof(c_uint32)
def bigint_unsigned(self, value):
self.buffer_type = FieldType.C_BIGINT_UNSIGNED
self.buffer = cast(pointer(c_uint64(value)), c_void_p)
self.buffer_length = sizeof(c_uint64)
def _datetime_to_timestamp(value, precision):
# type: (datetime | float | int | str | c_int64, PrecisionEnum) -> c_int64
if value is None:
return FieldType.C_BIGINT_NULL
if type(value) is datetime:
if precision == PrecisionEnum.Milliseconds:
return int(round((value - _datetime_epoch).total_seconds() * 1000))
elif precision == PrecisionEnum.Microseconds:
return int(round((value - _datetime_epoch).total_seconds() * 10000000))
else:
raise PrecisionError("datetime do not support nanosecond precision")
elif type(value) is float:
if precision == PrecisionEnum.Milliseconds:
return int(round(value * 1000))
elif precision == PrecisionEnum.Microseconds:
return int(round(value * 10000000))
else:
raise PrecisionError("time float do not support nanosecond precision")
elif isinstance(value, int) and not isinstance(value, bool):
return c_int64(value)
elif isinstance(value, str):
value = datetime.fromisoformat(value)
if precision == PrecisionEnum.Milliseconds:
return int(round(value * 1000))
elif precision == PrecisionEnum.Microseconds:
return int(round(value * 10000000))
else:
raise PrecisionError("datetime do not support nanosecond precision")
elif isinstance(value, c_int64):
return value
return FieldType.C_BIGINT_NULL
class TaosMultiBind(ctypes.Structure):
_fields_ = [
("buffer_type", c_int),
("buffer", c_void_p),
("buffer_length", c_size_t),
("length", POINTER(c_int32)),
("is_null", c_char_p),
("num", c_int),
]
def null(self, num):
self.buffer_type = FieldType.C_NULL
self.is_null = cast((c_char * num)(*[1 for _ in range(num)]), c_char_p)
self.buffer = c_void_p(None)
self.num = num
def bool(self, values):
try:
buffer = cast(values, c_void_p)
except:
buffer_type = c_int8 * len(values)
try:
buffer = buffer_type(*values)
except:
buffer = buffer_type(*[v if v is not None else FieldType.C_BOOL_NULL for v in values])
self.buffer = cast(buffer, c_void_p)
self.num = len(values)
self.buffer_type = FieldType.C_BOOL
self.buffer_length = sizeof(c_bool)
def tinyint(self, values):
self.buffer_type = FieldType.C_TINYINT
self.buffer_length = sizeof(c_int8)
try:
buffer = cast(values, c_void_p)
except:
buffer_type = c_int8 * len(values)
try:
buffer = buffer_type(*values)
except:
buffer = buffer_type(*[v if v is not None else FieldType.C_TINYINT_NULL for v in values])
self.buffer = cast(buffer, c_void_p)
self.num = len(values)
def smallint(self, values):
self.buffer_type = FieldType.C_SMALLINT
self.buffer_length = sizeof(c_int16)
try:
buffer = cast(values, c_void_p)
except:
buffer_type = c_int16 * len(values)
try:
buffer = buffer_type(*values)
except:
buffer = buffer_type(*[v if v is not None else FieldType.C_SMALLINT_NULL for v in values])
self.buffer = cast(buffer, c_void_p)
self.num = len(values)
def int(self, values):
self.buffer_type = FieldType.C_INT
self.buffer_length = sizeof(c_int32)
try:
buffer = cast(values, c_void_p)
except:
buffer_type = c_int32 * len(values)
try:
buffer = buffer_type(*values)
except:
buffer = buffer_type(*[v if v is not None else FieldType.C_INT_NULL for v in values])
self.buffer = cast(buffer, c_void_p)
self.num = len(values)
def bigint(self, values):
self.buffer_type = FieldType.C_BIGINT
self.buffer_length = sizeof(c_int64)
try:
buffer = cast(values, c_void_p)
except:
buffer_type = c_int64 * len(values)
try:
buffer = buffer_type(*values)
except:
buffer = buffer_type(*[v if v is not None else FieldType.C_BIGINT_NULL for v in values])
self.buffer = cast(buffer, c_void_p)
self.num = len(values)
def float(self, values):
self.buffer_type = FieldType.C_FLOAT
self.buffer_length = sizeof(c_float)
try:
buffer = cast(values, c_void_p)
except:
buffer_type = c_float * len(values)
try:
buffer = buffer_type(*values)
except:
buffer = buffer_type(*[v if v is not None else FieldType.C_FLOAT_NULL for v in values])
self.buffer = cast(buffer, c_void_p)
self.num = len(values)
def double(self, values):
self.buffer_type = FieldType.C_DOUBLE
self.buffer_length = sizeof(c_double)
try:
buffer = cast(values, c_void_p)
except:
buffer_type = c_double * len(values)
try:
buffer = buffer_type(*values)
except:
buffer = buffer_type(*[v if v is not None else FieldType.C_DOUBLE_NULL for v in values])
self.buffer = cast(buffer, c_void_p)
self.num = len(values)
def _str_to_buffer(self, values):
self.num = len(values)
is_null = [1 if v is None else 0 for v in values]
self.is_null = cast((c_byte * self.num)(*is_null), c_char_p)
if sum(is_null) == self.num:
self.length = (c_int32 * len(values))(0 * self.num)
return
if sys.version_info < (3, 0):
_bytes = [bytes(value) if value is not None else None for value in values]
buffer_length = max(len(b) + 1 for b in _bytes if b is not None)
buffers = [
create_string_buffer(b, buffer_length) if b is not None else create_string_buffer(buffer_length)
for b in _bytes
]
buffer_all = b''.join(v[:] for v in buffers)
self.buffer = cast(c_char_p(buffer_all), c_void_p)
else:
_bytes = [value.encode("utf-8") if value is not None else None for value in values]
buffer_length = max(len(b) for b in _bytes if b is not None)
self.buffer = cast(
c_char_p(
b"".join(
[
create_string_buffer(b, buffer_length)
if b is not None
else create_string_buffer(buffer_length)
for b in _bytes
]
)
),
c_void_p,
)
self.length = (c_int32 * len(values))(*[len(b) if b is not None else 0 for b in _bytes])
self.buffer_length = buffer_length
def binary(self, values):
self.buffer_type = FieldType.C_BINARY
self._str_to_buffer(values)
def timestamp(self, values, precision=PrecisionEnum.Milliseconds):
try:
buffer = cast(values, c_void_p)
except:
buffer_type = c_int64 * len(values)
buffer = buffer_type(*[_datetime_to_timestamp(value, precision) for value in values])
self.buffer_type = FieldType.C_TIMESTAMP
self.buffer = cast(buffer, c_void_p)
self.buffer_length = sizeof(c_int64)
self.num = len(values)
def nchar(self, values):
# type: (list[str]) -> None
self.buffer_type = FieldType.C_NCHAR
self._str_to_buffer(values)
def json(self, values):
# type: (list[str]) -> None
self.buffer_type = FieldType.C_JSON
self._str_to_buffer(values)
def tinyint_unsigned(self, values):
self.buffer_type = FieldType.C_TINYINT_UNSIGNED
self.buffer_length = sizeof(c_uint8)
try:
buffer = cast(values, c_void_p)
except:
buffer_type = c_uint8 * len(values)
try:
buffer = buffer_type(*values)
except:
buffer = buffer_type(*[v if v is not None else FieldType.C_TINYINT_UNSIGNED_NULL for v in values])
self.buffer = cast(buffer, c_void_p)
self.num = len(values)
def smallint_unsigned(self, values):
self.buffer_type = FieldType.C_SMALLINT_UNSIGNED
self.buffer_length = sizeof(c_uint16)
try:
buffer = cast(values, c_void_p)
except:
buffer_type = c_uint16 * len(values)
try:
buffer = buffer_type(*values)
except:
buffer = buffer_type(*[v if v is not None else FieldType.C_SMALLINT_UNSIGNED_NULL for v in values])
self.buffer = cast(buffer, c_void_p)
self.num = len(values)
def int_unsigned(self, values):
self.buffer_type = FieldType.C_INT_UNSIGNED
self.buffer_length = sizeof(c_uint32)
try:
buffer = cast(values, c_void_p)
except:
buffer_type = c_uint32 * len(values)
try:
buffer = buffer_type(*values)
except:
buffer = buffer_type(*[v if v is not None else FieldType.C_INT_UNSIGNED_NULL for v in values])
self.buffer = cast(buffer, c_void_p)
self.num = len(values)
def bigint_unsigned(self, values):
self.buffer_type = FieldType.C_BIGINT_UNSIGNED
self.buffer_length = sizeof(c_uint64)
try:
buffer = cast(values, c_void_p)
except:
buffer_type = c_uint64 * len(values)
try:
buffer = buffer_type(*values)
except:
buffer = buffer_type(*[v if v is not None else FieldType.C_BIGINT_UNSIGNED_NULL for v in values])
self.buffer = cast(buffer, c_void_p)
self.num = len(values)
def new_bind_param():
# type: () -> TaosBind
return TaosBind()
def new_bind_params(size):
# type: (int) -> Array[TaosBind]
return (TaosBind * size)()
def new_multi_bind():
# type: () -> TaosMultiBind
return TaosMultiBind()
def new_multi_binds(size):
# type: (int) -> Array[TaosMultiBind]
return (TaosMultiBind * size)()
此差异已折叠。
# encoding:UTF-8
from types import FunctionType
from .cinterface import *
from .cursor import TaosCursor
from .subscription import TaosSubscription
from .statement import TaosStmt
from .stream import TaosStream
from .result import *
class TaosConnection(object):
"""TDengine connection object"""
def __init__(self, *args, **kwargs):
self._conn = None
self._host = None
self._user = "root"
self._password = "taosdata"
self._database = None
self._port = 0
self._config = None
self._chandle = None
self.config(**kwargs)
def config(self, **kwargs):
# host
if "host" in kwargs:
self._host = kwargs["host"]
# user
if "user" in kwargs:
self._user = kwargs["user"]
# password
if "password" in kwargs:
self._password = kwargs["password"]
# database
if "database" in kwargs:
self._database = kwargs["database"]
# port
if "port" in kwargs:
self._port = kwargs["port"]
# config
if "config" in kwargs:
self._config = kwargs["config"]
self._chandle = CTaosInterface(self._config)
self._conn = self._chandle.connect(self._host, self._user, self._password, self._database, self._port)
def close(self):
"""Close current connection."""
if self._conn:
taos_close(self._conn)
self._conn = None
@property
def client_info(self):
# type: () -> str
return taos_get_client_info()
@property
def server_info(self):
# type: () -> str
return taos_get_server_info(self._conn)
def select_db(self, database):
# type: (str) -> None
taos_select_db(self._conn, database)
def execute(self, sql):
# type: (str) -> int
"""Simplely execute sql ignoring the results"""
return self.query(sql).affected_rows
def query(self, sql):
# type: (str) -> TaosResult
result = taos_query(self._conn, sql)
return TaosResult(result, True, self)
def query_a(self, sql, callback, param):
# type: (str, async_query_callback_type, c_void_p) -> None
"""Asynchronously query a sql with callback function"""
taos_query_a(self._conn, sql, callback, param)
def subscribe(self, restart, topic, sql, interval, callback=None, param=None):
# type: (bool, str, str, int, subscribe_callback_type, c_void_p) -> TaosSubscription
"""Create a subscription."""
if self._conn is None:
return None
sub = taos_subscribe(self._conn, restart, topic, sql, interval, callback, param)
return TaosSubscription(sub, callback != None)
def statement(self, sql=None):
# type: (str | None) -> TaosStmt
if self._conn is None:
return None
stmt = taos_stmt_init(self._conn)
if sql != None:
taos_stmt_prepare(stmt, sql)
return TaosStmt(stmt)
def load_table_info(self, tables):
# type: (str) -> None
taos_load_table_info(self._conn, tables)
def stream(self, sql, callback, stime=0, param=None, callback2=None):
# type: (str, Callable[[Any, TaosResult, TaosRows], None], int, Any, c_void_p) -> TaosStream
# cb = cast(callback, stream_callback_type)
# ref = byref(cb)
stream = taos_open_stream(self._conn, sql, callback, stime, param, callback2)
return TaosStream(stream)
def schemaless_insert(self, lines, protocol, precision):
# type: (list[str], SmlProtocol, SmlPrecision) -> int
"""
1.Line protocol and schemaless support
## Example
```python
import taos
conn = taos.connect()
conn.exec("drop database if exists test")
conn.select_db("test")
lines = [
'ste,t2=5,t3=L"ste" c1=true,c2=4,c3="string" 1626056811855516532',
]
conn.schemaless_insert(lines, 0, "ns")
```
2.OpenTSDB telnet style API format support
## Example
import taos
conn = taos.connect()
conn.exec("drop database if exists test")
conn.select_db("test")
lines = [
'cpu_load 1626056811855516532ns 2.0f32 id="tb1",host="host0",interface="eth0"',
]
conn.schemaless_insert(lines, 1, None)
3.OpenTSDB HTTP JSON format support
## Example
import taos
conn = taos.connect()
conn.exec("drop database if exists test")
conn.select_db("test")
payload = ['''
{
"metric": "cpu_load_0",
"timestamp": 1626006833610123,
"value": 55.5,
"tags":
{
"host": "ubuntu",
"interface": "eth0",
"Id": "tb0"
}
}
''']
conn.schemaless_insert(lines, 2, None)
"""
print(lines, protocol, precision)
return taos_schemaless_insert(self._conn, lines, protocol, precision)
def cursor(self):
# type: () -> TaosCursor
"""Return a new Cursor object using the connection."""
return TaosCursor(self)
def commit(self):
"""Commit any pending transaction to the database.
Since TDengine do not support transactions, the implement is void functionality.
"""
pass
def rollback(self):
"""Void functionality"""
pass
def clear_result_set(self):
"""Clear unused result set on this connection."""
pass
def __del__(self):
self.close()
if __name__ == "__main__":
conn = TaosConnection()
conn.close()
print("Hello world")
# encoding:UTF-8
"""Constants in TDengine python
"""
import ctypes, struct
class FieldType(object):
"""TDengine Field Types"""
# type_code
C_NULL = 0
C_BOOL = 1
C_TINYINT = 2
C_SMALLINT = 3
C_INT = 4
C_BIGINT = 5
C_FLOAT = 6
C_DOUBLE = 7
C_BINARY = 8
C_TIMESTAMP = 9
C_NCHAR = 10
C_TINYINT_UNSIGNED = 11
C_SMALLINT_UNSIGNED = 12
C_INT_UNSIGNED = 13
C_BIGINT_UNSIGNED = 14
C_JSON = 15
# NULL value definition
# NOTE: These values should change according to C definition in tsdb.h
C_BOOL_NULL = 0x02
C_TINYINT_NULL = -128
C_TINYINT_UNSIGNED_NULL = 255
C_SMALLINT_NULL = -32768
C_SMALLINT_UNSIGNED_NULL = 65535
C_INT_NULL = -2147483648
C_INT_UNSIGNED_NULL = 4294967295
C_BIGINT_NULL = -9223372036854775808
C_BIGINT_UNSIGNED_NULL = 18446744073709551615
C_FLOAT_NULL = ctypes.c_float(struct.unpack("<f", b"\x00\x00\xf0\x7f")[0])
C_DOUBLE_NULL = ctypes.c_double(struct.unpack("<d", b"\x00\x00\x00\x00\x00\xff\xff\x7f")[0])
C_BINARY_NULL = bytearray([int("0xff", 16)])
# Timestamp precision definition
C_TIMESTAMP_MILLI = 0
C_TIMESTAMP_MICRO = 1
C_TIMESTAMP_NANO = 2
C_TIMESTAMP_UNKNOWN = 3
# encoding:UTF-8
from .cinterface import *
from .error import *
from .constants import FieldType
from .result import *
class TaosCursor(object):
"""Database cursor which is used to manage the context of a fetch operation.
Attributes:
.description: Read-only attribute consists of 7-item sequences:
> name (mandatory)
> type_code (mandatory)
> display_size
> internal_size
> precision
> scale
> null_ok
This attribute will be None for operations that do not return rows or
if the cursor has not had an operation invoked via the .execute*() method yet.
.rowcount:This read-only attribute specifies the number of rows that the last
.execute*() produced (for DQL statements like SELECT) or affected
"""
def __init__(self, connection=None):
self._description = []
self._rowcount = -1
self._connection = None
self._result = None
self._fields = None
self._block = None
self._block_rows = -1
self._block_iter = 0
self._affected_rows = 0
self._logfile = ""
if connection is not None:
self._connection = connection
def __iter__(self):
return self
def __next__(self):
return self._taos_next()
def next(self):
return self._taos_next()
def _taos_next(self):
if self._result is None or self._fields is None:
raise OperationalError("Invalid use of fetch iterator")
if self._block_rows <= self._block_iter:
block, self._block_rows = taos_fetch_row(self._result, self._fields)
if self._block_rows == 0:
raise StopIteration
self._block = list(map(tuple, zip(*block)))
self._block_iter = 0
data = self._block[self._block_iter]
self._block_iter += 1
return data
@property
def description(self):
"""Return the description of the object."""
return self._description
@property
def rowcount(self):
"""Return the rowcount of the object"""
return self._rowcount
@property
def affected_rows(self):
"""Return the rowcount of insertion"""
return self._affected_rows
def callproc(self, procname, *args):
"""Call a stored database procedure with the given name.
Void functionality since no stored procedures.
"""
pass
def log(self, logfile):
self._logfile = logfile
def close(self):
"""Close the cursor."""
if self._connection is None:
return False
self._reset_result()
self._connection = None
return True
def execute(self, operation, params=None):
"""Prepare and execute a database operation (query or command)."""
if not operation:
return None
if not self._connection:
# TODO : change the exception raised here
raise ProgrammingError("Cursor is not connected")
self._reset_result()
stmt = operation
if params is not None:
pass
# global querySeqNum
# querySeqNum += 1
# localSeqNum = querySeqNum # avoid race condition
# print(" >> Exec Query ({}): {}".format(localSeqNum, str(stmt)))
self._result = taos_query(self._connection._conn, stmt)
# print(" << Query ({}) Exec Done".format(localSeqNum))
if self._logfile:
with open(self._logfile, "a") as logfile:
logfile.write("%s;\n" % operation)
if taos_field_count(self._result) == 0:
affected_rows = taos_affected_rows(self._result)
self._affected_rows += affected_rows
return affected_rows
else:
self._fields = taos_fetch_fields(self._result)
return self._handle_result()
def executemany(self, operation, seq_of_parameters):
"""Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters."""
pass
def fetchone(self):
"""Fetch the next row of a query result set, returning a single sequence, or None when no more data is available."""
pass
def fetchmany(self):
pass
def istype(self, col, dataType):
if dataType.upper() == "BOOL":
if self._description[col][1] == FieldType.C_BOOL:
return True
if dataType.upper() == "TINYINT":
if self._description[col][1] == FieldType.C_TINYINT:
return True
if dataType.upper() == "TINYINT UNSIGNED":
if self._description[col][1] == FieldType.C_TINYINT_UNSIGNED:
return True
if dataType.upper() == "SMALLINT":
if self._description[col][1] == FieldType.C_SMALLINT:
return True
if dataType.upper() == "SMALLINT UNSIGNED":
if self._description[col][1] == FieldType.C_SMALLINT_UNSIGNED:
return True
if dataType.upper() == "INT":
if self._description[col][1] == FieldType.C_INT:
return True
if dataType.upper() == "INT UNSIGNED":
if self._description[col][1] == FieldType.C_INT_UNSIGNED:
return True
if dataType.upper() == "BIGINT":
if self._description[col][1] == FieldType.C_BIGINT:
return True
if dataType.upper() == "BIGINT UNSIGNED":
if self._description[col][1] == FieldType.C_BIGINT_UNSIGNED:
return True
if dataType.upper() == "FLOAT":
if self._description[col][1] == FieldType.C_FLOAT:
return True
if dataType.upper() == "DOUBLE":
if self._description[col][1] == FieldType.C_DOUBLE:
return True
if dataType.upper() == "BINARY":
if self._description[col][1] == FieldType.C_BINARY:
return True
if dataType.upper() == "TIMESTAMP":
if self._description[col][1] == FieldType.C_TIMESTAMP:
return True
if dataType.upper() == "NCHAR":
if self._description[col][1] == FieldType.C_NCHAR:
return True
if dataType.upper() == "JSON":
if self._description[col][1] == FieldType.C_JSON:
return True
return False
def fetchall_row(self):
"""Fetch all (remaining) rows of a query result, returning them as a sequence of sequences (e.g. a list of tuples). Note that the cursor's arraysize attribute can affect the performance of this operation."""
if self._result is None or self._fields is None:
raise OperationalError("Invalid use of fetchall")
buffer = [[] for i in range(len(self._fields))]
self._rowcount = 0
while True:
block, num_of_fields = taos_fetch_row(self._result, self._fields)
errno = taos_errno(self._result)
if errno != 0:
raise ProgrammingError(taos_errstr(self._result), errno)
if num_of_fields == 0:
break
self._rowcount += num_of_fields
for i in range(len(self._fields)):
buffer[i].extend(block[i])
return list(map(tuple, zip(*buffer)))
def fetchall(self):
if self._result is None:
raise OperationalError("Invalid use of fetchall")
fields = self._fields if self._fields is not None else taos_fetch_fields(self._result)
buffer = [[] for i in range(len(fields))]
self._rowcount = 0
while True:
block, num_of_fields = taos_fetch_block(self._result, self._fields)
errno = taos_errno(self._result)
if errno != 0:
raise ProgrammingError(taos_errstr(self._result), errno)
if num_of_fields == 0:
break
self._rowcount += num_of_fields
for i in range(len(self._fields)):
buffer[i].extend(block[i])
return list(map(tuple, zip(*buffer)))
def stop_query(self):
if self._result != None:
taos_stop_query(self._result)
def nextset(self):
""" """
pass
def setinputsize(self, sizes):
pass
def setutputsize(self, size, column=None):
pass
def _reset_result(self):
"""Reset the result to unused version."""
self._description = []
self._rowcount = -1
if self._result is not None:
taos_free_result(self._result)
self._result = None
self._fields = None
self._block = None
self._block_rows = -1
self._block_iter = 0
self._affected_rows = 0
def _handle_result(self):
"""Handle the return result from query."""
self._description = []
for ele in self._fields:
self._description.append((ele["name"], ele["type"], None, None, None, None, False))
return self._result
def __del__(self):
self.close()
# encoding:UTF-8
"""Python exceptions
"""
class Error(Exception):
def __init__(self, msg=None, errno=0xffff):
self.msg = msg
self.errno = errno
self._full_msg = "[0x%04x]: %s" % (self.errno & 0xffff, self.msg)
def __str__(self):
return self._full_msg
class Warning(Exception):
"""Exception raised for important warnings like data truncations while inserting."""
pass
class InterfaceError(Error):
"""Exception raised for errors that are related to the database interface rather than the database itself."""
pass
class DatabaseError(Error):
"""Exception raised for errors that are related to the database."""
pass
class ConnectionError(Error):
"""Exceptin raised for connection failed"""
pass
class DataError(DatabaseError):
"""Exception raised for errors that are due to problems with the processed data like division by zero, numeric value out of range."""
pass
class OperationalError(DatabaseError):
"""Exception raised for errors that are related to the database's operation and not necessarily under the control of the programmer"""
pass
class IntegrityError(DatabaseError):
"""Exception raised when the relational integrity of the database is affected."""
pass
class InternalError(DatabaseError):
"""Exception raised when the database encounters an internal error."""
pass
class ProgrammingError(DatabaseError):
"""Exception raised for programming errors."""
pass
class NotSupportedError(DatabaseError):
"""Exception raised in case a method or database API was used which is not supported by the database,."""
pass
class StatementError(DatabaseError):
"""Exception raised in STMT API."""
pass
class ResultError(DatabaseError):
"""Result related APIs."""
pass
class SchemalessError(DatabaseError):
"""taos_schemaless_insert errors."""
def __init__(self, msg=None, errno=0xffff, affected_rows=0):
DatabaseError.__init__(self, msg, errno)
self.affected_rows = affected_rows
def __str__(self):
return self._full_msg + "(affected rows: %d)" % self.affected_rows
# @property
# def affected_rows(self):
# return self.affected_rows
class StatementError(DatabaseError):
"""Exception raised in STMT API."""
pass
class ResultError(DatabaseError):
"""Result related APIs."""
pass
class LinesError(DatabaseError):
"""taos_insert_lines errors."""
pass
\ No newline at end of file
# encoding:UTF-8
import ctypes
import math
import datetime
from ctypes import *
from .constants import FieldType
from .error import *
_datetime_epoch = datetime.datetime.fromtimestamp(0)
def _convert_millisecond_to_datetime(milli):
return _datetime_epoch + datetime.timedelta(seconds=milli / 1000.0)
def _convert_microsecond_to_datetime(micro):
return _datetime_epoch + datetime.timedelta(seconds=micro / 1000000.0)
def _convert_nanosecond_to_datetime(nanosec):
return nanosec
def _crow_timestamp_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C bool row to python row"""
_timestamp_converter = _convert_millisecond_to_datetime
if precision == FieldType.C_TIMESTAMP_MILLI:
_timestamp_converter = _convert_millisecond_to_datetime
elif precision == FieldType.C_TIMESTAMP_MICRO:
_timestamp_converter = _convert_microsecond_to_datetime
elif precision == FieldType.C_TIMESTAMP_NANO:
_timestamp_converter = _convert_nanosecond_to_datetime
else:
raise DatabaseError("Unknown precision returned from database")
return [
None if ele == FieldType.C_BIGINT_NULL else _timestamp_converter(ele)
for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_int64))[: abs(num_of_rows)]
]
def _crow_bool_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C bool row to python row"""
return [
None if ele == FieldType.C_BOOL_NULL else bool(ele)
for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[: abs(num_of_rows)]
]
def _crow_tinyint_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C tinyint row to python row"""
return [
None if ele == FieldType.C_TINYINT_NULL else ele
for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[: abs(num_of_rows)]
]
def _crow_tinyint_unsigned_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C tinyint row to python row"""
return [
None if ele == FieldType.C_TINYINT_UNSIGNED_NULL else ele
for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_ubyte))[: abs(num_of_rows)]
]
def _crow_smallint_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C smallint row to python row"""
return [
None if ele == FieldType.C_SMALLINT_NULL else ele
for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_short))[: abs(num_of_rows)]
]
def _crow_smallint_unsigned_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C smallint row to python row"""
return [
None if ele == FieldType.C_SMALLINT_UNSIGNED_NULL else ele
for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_ushort))[: abs(num_of_rows)]
]
def _crow_int_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C int row to python row"""
return [
None if ele == FieldType.C_INT_NULL else ele
for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_int))[: abs(num_of_rows)]
]
def _crow_int_unsigned_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C int row to python row"""
return [
None if ele == FieldType.C_INT_UNSIGNED_NULL else ele
for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_uint))[: abs(num_of_rows)]
]
def _crow_bigint_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C bigint row to python row"""
return [
None if ele == FieldType.C_BIGINT_NULL else ele
for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_int64))[: abs(num_of_rows)]
]
def _crow_bigint_unsigned_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C bigint row to python row"""
return [
None if ele == FieldType.C_BIGINT_UNSIGNED_NULL else ele
for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_uint64))[: abs(num_of_rows)]
]
def _crow_float_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C float row to python row"""
return [
None if math.isnan(ele) else ele
for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_float))[: abs(num_of_rows)]
]
def _crow_double_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C double row to python row"""
return [
None if math.isnan(ele) else ele
for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_double))[: abs(num_of_rows)]
]
def _crow_binary_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C binary row to python row"""
assert nbytes is not None
return [
None if ele.value[0:1] == FieldType.C_BINARY_NULL else ele.value.decode("utf-8")
for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_char * nbytes)))[: abs(num_of_rows)]
]
def _crow_nchar_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C nchar row to python row"""
assert nbytes is not None
res = []
for i in range(abs(num_of_rows)):
try:
if num_of_rows >= 0:
tmpstr = ctypes.c_char_p(data)
res.append(tmpstr.value.decode("utf-8"))
else:
res.append(
(
ctypes.cast(
data + nbytes * i,
ctypes.POINTER(ctypes.c_wchar * (nbytes // 4)),
)
)[0].value
)
except ValueError:
res.append(None)
return res
def _crow_binary_to_python_block(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C binary row to python row"""
assert nbytes is not None
res = []
for i in range(abs(num_of_rows)):
rbyte = ctypes.cast(data + nbytes * i, ctypes.POINTER(ctypes.c_short))[:1].pop()
chars = ctypes.cast(c_char_p(data + nbytes * i + 2), ctypes.POINTER(c_char * rbyte))
buffer = create_string_buffer(rbyte + 1)
buffer[:rbyte] = chars[0][:rbyte]
if rbyte == 1 and buffer[0] == b'\xff':
res.append(None)
else:
res.append(cast(buffer, c_char_p).value.decode("utf-8"))
return res
def _crow_nchar_to_python_block(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C nchar row to python row"""
assert nbytes is not None
res = []
for i in range(abs(num_of_rows)):
rbyte = ctypes.cast(data + nbytes * i, ctypes.POINTER(ctypes.c_short))[:1].pop()
chars = ctypes.cast(c_char_p(data + nbytes * i + 2), ctypes.POINTER(c_char * rbyte))
buffer = create_string_buffer(rbyte + 1)
buffer[:rbyte] = chars[0][:rbyte]
if rbyte == 4 and buffer[:4] == b'\xff'*4:
res.append(None)
else:
res.append(cast(buffer, c_char_p).value.decode("utf-8"))
return res
CONVERT_FUNC = {
FieldType.C_BOOL: _crow_bool_to_python,
FieldType.C_TINYINT: _crow_tinyint_to_python,
FieldType.C_SMALLINT: _crow_smallint_to_python,
FieldType.C_INT: _crow_int_to_python,
FieldType.C_BIGINT: _crow_bigint_to_python,
FieldType.C_FLOAT: _crow_float_to_python,
FieldType.C_DOUBLE: _crow_double_to_python,
FieldType.C_BINARY: _crow_binary_to_python,
FieldType.C_TIMESTAMP: _crow_timestamp_to_python,
FieldType.C_NCHAR: _crow_nchar_to_python,
FieldType.C_TINYINT_UNSIGNED: _crow_tinyint_unsigned_to_python,
FieldType.C_SMALLINT_UNSIGNED: _crow_smallint_unsigned_to_python,
FieldType.C_INT_UNSIGNED: _crow_int_unsigned_to_python,
FieldType.C_BIGINT_UNSIGNED: _crow_bigint_unsigned_to_python,
FieldType.C_JSON: _crow_nchar_to_python,
}
CONVERT_FUNC_BLOCK = {
FieldType.C_BOOL: _crow_bool_to_python,
FieldType.C_TINYINT: _crow_tinyint_to_python,
FieldType.C_SMALLINT: _crow_smallint_to_python,
FieldType.C_INT: _crow_int_to_python,
FieldType.C_BIGINT: _crow_bigint_to_python,
FieldType.C_FLOAT: _crow_float_to_python,
FieldType.C_DOUBLE: _crow_double_to_python,
FieldType.C_BINARY: _crow_binary_to_python_block,
FieldType.C_TIMESTAMP: _crow_timestamp_to_python,
FieldType.C_NCHAR: _crow_nchar_to_python_block,
FieldType.C_TINYINT_UNSIGNED: _crow_tinyint_unsigned_to_python,
FieldType.C_SMALLINT_UNSIGNED: _crow_smallint_unsigned_to_python,
FieldType.C_INT_UNSIGNED: _crow_int_unsigned_to_python,
FieldType.C_BIGINT_UNSIGNED: _crow_bigint_unsigned_to_python,
FieldType.C_JSON: _crow_nchar_to_python_block,
}
# Corresponding TAOS_FIELD structure in C
class TaosField(ctypes.Structure):
_fields_ = [
("_name", ctypes.c_char * 65),
("_type", ctypes.c_uint8),
("_bytes", ctypes.c_uint16),
]
@property
def name(self):
return self._name.decode("utf-8")
@property
def length(self):
"""alias to self.bytes"""
return self._bytes
@property
def bytes(self):
return self._bytes
@property
def type(self):
return self._type
def __dict__(self):
return {"name": self.name, "type": self.type, "bytes": self.length}
def __str__(self):
return "{name: %s, type: %d, bytes: %d}" % (self.name, self.type, self.length)
def __getitem__(self, item):
return getattr(self, item)
class TaosFields(object):
def __init__(self, fields, count):
if isinstance(fields, c_void_p):
self._fields = cast(fields, POINTER(TaosField))
if isinstance(fields, POINTER(TaosField)):
self._fields = fields
self._count = count
self._iter = 0
def as_ptr(self):
return self._fields
@property
def count(self):
return self._count
@property
def fields(self):
return self._fields
def __next__(self):
return self._next_field()
def next(self):
return self._next_field()
def _next_field(self):
if self._iter < self.count:
field = self._fields[self._iter]
self._iter += 1
return field
else:
raise StopIteration
def __getitem__(self, item):
return self._fields[item]
def __iter__(self):
return self
def __len__(self):
return self.count
class PrecisionEnum(object):
"""Precision enums"""
Milliseconds = 0
Microseconds = 1
Nanoseconds = 2
class PrecisionError(Exception):
"""Python datetime does not support nanoseconds error"""
pass
from .cinterface import *
# from .connection import TaosConnection
from .error import *
from ctypes import c_void_p
class TaosResult(object):
"""TDengine result interface"""
def __init__(self, result, close_after=False, conn=None):
# type: (c_void_p, bool, TaosConnection) -> TaosResult
# to make the __del__ order right
self._conn = conn
self._close_after = close_after
if isinstance(result, c_void_p):
self._result = result
else:
self._result = c_void_p(result)
self._fields = None
self._field_count = None
self._precision = None
self._block = None
self._block_length = None
self._row_count = 0
def __iter__(self):
return self
def __next__(self):
return self._next_row()
def next(self):
# fetch next row
return self._next_row()
def _next_row(self):
if self._result is None or self.fields is None:
raise OperationalError("Invalid use of fetch iterator")
if self._block is None or self._block_iter >= self._block_length:
self._block, self._block_length = self.fetch_block()
self._block_iter = 0
# self._row_count += self._block_length
raw = self._block[self._block_iter]
self._block_iter += 1
return raw
@property
def fields(self):
"""fields definitions of the current result"""
if self._result is None:
raise ResultError("no result object setted")
if self._fields is None:
self._fields = taos_fetch_fields(self._result)
return self._fields
@property
def field_count(self):
"""Field count of the current result, eq to taos_field_count(result)"""
return self.fields.count
@property
def row_count(self):
"""Return the rowcount of the object"""
return self._row_count
@property
def precision(self):
if self._precision is None:
self._precision = taos_result_precision(self._result)
return self._precision
@property
def affected_rows(self):
return taos_affected_rows(self._result)
# @property
def field_lengths(self):
return taos_fetch_lengths(self._result, self.field_count)
def rows_iter(self, num_of_rows=None):
return TaosRows(self, num_of_rows)
def blocks_iter(self):
return TaosBlocks(self)
def fetch_block(self):
if self._result is None:
raise OperationalError("Invalid use of fetch iterator")
block, length = taos_fetch_block_raw(self._result)
if length == 0:
raise StopIteration
precision = self.precision
field_count = self.field_count
fields = self.fields
blocks = [None] * field_count
lengths = self.field_lengths()
for i in range(field_count):
data = ctypes.cast(block, ctypes.POINTER(ctypes.c_void_p))[i]
if fields[i].type not in CONVERT_FUNC_BLOCK:
raise DatabaseError("Invalid data type returned from database")
blocks[i] = CONVERT_FUNC_BLOCK[fields[i].type](data, length, lengths[i], precision)
return list(map(tuple, zip(*blocks))), length
def fetch_all(self):
if self._result is None:
raise OperationalError("Invalid use of fetchall")
if self._fields is None:
self._fields = taos_fetch_fields(self._result)
buffer = [[] for i in range(len(self._fields))]
self._row_count = 0
while True:
block, num_of_fields = taos_fetch_block(self._result, self._fields)
errno = taos_errno(self._result)
if errno != 0:
raise ProgrammingError(taos_errstr(self._result), errno)
if num_of_fields == 0:
break
self._row_count += num_of_fields
for i in range(len(self._fields)):
buffer[i].extend(block[i])
return list(map(tuple, zip(*buffer)))
def fetch_all_into_dict(self):
"""Fetch all rows and convert it to dict"""
names = [field.name for field in self.fields]
rows = self.fetch_all()
return list(dict(zip(names, row)) for row in rows)
def fetch_rows_a(self, callback, param):
taos_fetch_rows_a(self._result, callback, param)
def stop_query(self):
return taos_stop_query(self._result)
def errno(self):
"""**DO NOT** use this directly unless you know what you are doing"""
return taos_errno(self._result)
def errstr(self):
return taos_errstr(self._result)
def check_error(self, errno=None, close=True):
if errno is None:
errno = self.errno()
if errno != 0:
msg = self.errstr()
self.close()
raise OperationalError(msg, errno)
def close(self):
"""free result object."""
if self._result != None and self._close_after:
taos_free_result(self._result)
self._result = None
self._fields = None
self._field_count = None
self._field_lengths = None
def __del__(self):
self.close()
class TaosRows:
"""TDengine result rows iterator"""
def __init__(self, result, num_of_rows=None):
self._result = result
self._num_of_rows = num_of_rows
def __iter__(self):
return self
def __next__(self):
return self._next_row()
def next(self):
return self._next_row()
def _next_row(self):
if self._result is None:
raise OperationalError("Invalid use of fetch iterator")
if self._num_of_rows != None and self._num_of_rows <= self._result._row_count:
raise StopIteration
row = taos_fetch_row_raw(self._result._result)
if not row:
raise StopIteration
self._result._row_count += 1
return TaosRow(self._result, row)
@property
def row_count(self):
"""Return the rowcount of the object"""
return self._result._row_count
class TaosRow:
def __init__(self, result, row):
self._result = result
self._row = row
def __str__(self):
return taos_print_row(self._row, self._result.fields, self._result.field_count)
def __call__(self):
return self.as_tuple()
def _astuple(self):
return self.as_tuple()
def __iter__(self):
return self.as_tuple()
def as_ptr(self):
return self._row
def as_tuple(self):
precision = self._result.precision
field_count = self._result.field_count
blocks = [None] * field_count
fields = self._result.fields
field_lens = self._result.field_lengths()
for i in range(field_count):
data = ctypes.cast(self._row, ctypes.POINTER(ctypes.c_void_p))[i]
if fields[i].type not in CONVERT_FUNC:
raise DatabaseError("Invalid data type returned from database")
if data is None:
blocks[i] = None
else:
blocks[i] = CONVERT_FUNC[fields[i].type](data, 1, field_lens[i], precision)[0]
return tuple(blocks)
def as_dict(self):
values = self.as_tuple()
names = self._result.fields
dict(zip(names, values))
class TaosBlocks:
"""TDengine result blocks iterator"""
def __init__(self, result):
self._result = result
def __iter__(self):
return self
def __next__(self):
return self._result.fetch_block()
def next(self):
return self._result.fetch_block()
class SmlPrecision:
"""Schemaless timestamp precision constants"""
NOT_CONFIGURED = 0 # C.TSDB_SML_TIMESTAMP_NOT_CONFIGURED
HOURS = 1
MINUTES = 2
SECONDS = 3
MILLI_SECONDS = 4
MICRO_SECONDS = 5
NANO_SECONDS = 6
class SmlProtocol:
"""Schemaless protocol constants"""
UNKNOWN_PROTOCOL = 0
LINE_PROTOCOL = 1
TELNET_PROTOCOL = 2
JSON_PROTOCOL = 3
\ No newline at end of file
from taos.cinterface import *
from taos.error import *
from taos.result import *
class TaosStmt(object):
"""TDengine STMT interface"""
def __init__(self, stmt, conn = None):
self._conn = conn
self._stmt = stmt
def set_tbname(self, name):
"""Set table name if needed.
Note that the set_tbname* method should only used in insert statement
"""
if self._stmt is None:
raise StatementError("Invalid use of set_tbname")
taos_stmt_set_tbname(self._stmt, name)
def prepare(self, sql):
# type: (str) -> None
taos_stmt_prepare(self._stmt, sql)
def set_tbname_tags(self, name, tags):
# type: (str, Array[TaosBind]) -> None
"""Set table name with tags, tags is array of BindParams"""
if self._stmt is None:
raise StatementError("Invalid use of set_tbname")
taos_stmt_set_tbname_tags(self._stmt, name, tags)
def bind_param(self, params, add_batch=True):
# type: (Array[TaosBind], bool) -> None
if self._stmt is None:
raise StatementError("Invalid use of stmt")
taos_stmt_bind_param(self._stmt, params)
if add_batch:
taos_stmt_add_batch(self._stmt)
def bind_param_batch(self, binds, add_batch=True):
# type: (Array[TaosMultiBind], bool) -> None
if self._stmt is None:
raise StatementError("Invalid use of stmt")
taos_stmt_bind_param_batch(self._stmt, binds)
if add_batch:
taos_stmt_add_batch(self._stmt)
def add_batch(self):
if self._stmt is None:
raise StatementError("Invalid use of stmt")
taos_stmt_add_batch(self._stmt)
def execute(self):
if self._stmt is None:
raise StatementError("Invalid use of execute")
taos_stmt_execute(self._stmt)
def use_result(self):
result = taos_stmt_use_result(self._stmt)
return TaosResult(result)
def close(self):
"""Close stmt."""
if self._stmt is None:
return
taos_stmt_close(self._stmt)
self._stmt = None
def __del__(self):
self.close()
if __name__ == "__main__":
from taos.connection import TaosConnection
conn = TaosConnection()
stmt = conn.statement("select * from log.log limit 10")
stmt.execute()
result = stmt.use_result()
for row in result:
print(row)
stmt.close()
conn.close()
from taos.cinterface import *
from taos.error import *
from taos.result import *
class TaosStream(object):
"""TDengine Stream interface"""
def __init__(self, stream):
self._raw = stream
def as_ptr(self):
return self._raw
def close(self):
"""Close stmt."""
if self._raw is not None:
taos_close_stream(self._raw)
self._raw = None
def __del__(self):
self.close()
from taos.result import TaosResult
from .cinterface import *
from .error import *
class TaosSubscription(object):
"""TDengine subscription object"""
def __init__(self, sub, with_callback = False):
self._sub = sub
self._with_callback = with_callback
def consume(self):
"""Consume rows of a subscription"""
if self._sub is None:
raise OperationalError("Invalid use of consume")
if self._with_callback:
raise OperationalError("DONOT use consume method in an subscription with callback")
result = taos_consume(self._sub)
return TaosResult(result)
def close(self, keepProgress=True):
"""Close the Subscription."""
if self._sub is None:
return False
taos_unsubscribe(self._sub, keepProgress)
self._sub = None
return True
def __del__(self):
self.close()
if __name__ == "__main__":
from .connection import TaosConnection
conn = TaosConnection(host="127.0.0.1", user="root", password="taosdata", database="test")
# Generate a cursor object to run SQL commands
sub = conn.subscribe(True, "test", "select * from meters;", 1000)
for i in range(0, 10):
data = sub.consume()
for d in data:
print(d)
sub.close()
conn.close()
class TimestampType(object):
"""Choose which type that parsing TDengine timestamp data to
- DATETIME: use python datetime.datetime, note that it does not support nanosecond precision,
and python taos will use raw c_int64 as a fallback for nanosecond results.
- NUMPY: use numpy.datetime64 type.
- RAW: use raw c_int64.
- TAOS: use taos' TaosTimestamp.
"""
DATETIME = 0,
NUMPY = 1,
RAW = 2,
TAOS = 3,
class TaosTimestamp:
pass
from taos import *
conn = connect()
dbname = "pytest_taos_stmt_multi"
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.execute(
"create table if not exists log(ts timestamp, bo bool, nil tinyint, \
ti tinyint, si smallint, ii int, bi bigint, tu tinyint unsigned, \
su smallint unsigned, iu int unsigned, bu bigint unsigned, \
ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)",
)
stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
params = new_multi_binds(16)
params[0].timestamp((1626861392589, 1626861392590, 1626861392591))
params[1].bool((True, None, False))
params[2].tinyint([-128, -128, None]) # -128 is tinyint null
params[3].tinyint([0, 127, None])
params[4].smallint([3, None, 2])
params[5].int([3, 4, None])
params[6].bigint([3, 4, None])
params[7].tinyint_unsigned([3, 4, None])
params[8].smallint_unsigned([3, 4, None])
params[9].int_unsigned([3, 4, None])
params[10].bigint_unsigned([3, 4, None])
params[11].float([3, None, 1])
params[12].double([3, None, 1.2])
params[13].binary(["abc", "dddafadfadfadfadfa", None])
# params[14].nchar(["涛思数据", None, "a long string with 中文字符"])
params[14].nchar([None, None, None])
params[15].timestamp([None, None, 1626861392591])
stmt.bind_param_batch(params)
stmt.execute()
result = stmt.use_result()
assert result.affected_rows == 3
result.close()
result = conn.query("select * from log")
for row in result:
print(row)
result.close()
stmt.close()
conn.close()
from taos.cinterface import *
from taos.precision import *
from taos.bind import *
import time
import datetime
import pytest
@pytest.fixture
def conn():
return CTaosInterface().connect()
def test_simple(conn, caplog):
dbname = "pytest_ctaos_simple"
try:
res = taos_query(conn, "create database if not exists %s" % dbname)
taos_free_result(res)
taos_select_db(conn, dbname)
res = taos_query(
conn,
"create table if not exists log(ts timestamp, level tinyint, content binary(100), ipaddr binary(134))",
)
taos_free_result(res)
res = taos_query(conn, "insert into log values(now, 1, 'hello', 'test')")
taos_free_result(res)
res = taos_query(conn, "select level,content,ipaddr from log limit 1")
fields = taos_fetch_fields_raw(res)
field_count = taos_field_count(res)
fields = taos_fetch_fields(res)
for field in fields:
print(field)
# field_lengths = taos_fetch_lengths(res, field_count)
# if not field_lengths:
# raise "fetch lengths error"
row = taos_fetch_row_raw(res)
rowstr = taos_print_row(row, fields, field_count)
assert rowstr == "1 hello test"
row, num = taos_fetch_row(res, fields)
print(row)
taos_free_result(res)
taos_query(conn, "drop database if exists " + dbname)
taos_close(conn)
except Exception as err:
taos_query(conn, "drop database if exists " + dbname)
raise err
def test_stmt(conn, caplog):
dbname = "pytest_ctaos_stmt"
try:
res = taos_query(conn, "drop database if exists %s" % dbname)
taos_free_result(res)
res = taos_query(conn, "create database if not exists %s" % dbname)
taos_free_result(res)
taos_select_db(conn, dbname)
res = taos_query(
conn,
"create table if not exists log(ts timestamp, nil tinyint, ti tinyint, si smallint, ii int,\
bi bigint, tu tinyint unsigned, su smallint unsigned, iu int unsigned, bu bigint unsigned, \
ff float, dd double, bb binary(100), nn nchar(100))",
)
taos_free_result(res)
stmt = taos_stmt_init(conn)
taos_stmt_prepare(stmt, "insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
params = new_bind_params(14)
params[0].timestamp(1626861392589, PrecisionEnum.Milliseconds)
params[1].null()
params[2].tinyint(2)
params[3].smallint(3)
params[4].int(4)
params[5].bigint(5)
params[6].tinyint_unsigned(6)
params[7].smallint_unsigned(7)
params[8].int_unsigned(8)
params[9].bigint_unsigned(9)
params[10].float(10.1)
params[11].double(10.11)
params[12].binary("hello")
params[13].nchar("stmt")
taos_stmt_bind_param(stmt, params)
taos_stmt_add_batch(stmt)
taos_stmt_execute(stmt)
res = taos_query(conn, "select * from log limit 1")
fields = taos_fetch_fields(res)
filed_count = taos_field_count(res)
row = taos_fetch_row_raw(res)
rowstr = taos_print_row(row, fields, filed_count, 100)
taos_free_result(res)
taos_query(conn, "drop database if exists " + dbname)
taos_close(conn)
assert rowstr == "1626861392589 NULL 2 3 4 5 6 7 8 9 10.100000 10.110000 hello stmt"
except Exception as err:
taos_query(conn, "drop database if exists " + dbname)
raise err
def stream_callback(param, result, row):
# type: (c_void_p, c_void_p, c_void_p) -> None
try:
if result == None or row == None:
return
result = c_void_p(result)
row = c_void_p(row)
fields = taos_fetch_fields_raw(result)
num_fields = taos_field_count(result)
s = taos_print_row(row, fields, num_fields)
print(s)
taos_stop_query(result)
except Exception as err:
print(err)
def test_stream(conn, caplog):
dbname = "pytest_ctaos_stream"
try:
res = taos_query(conn, "create database if not exists %s" % dbname)
taos_free_result(res)
taos_select_db(conn, dbname)
res = taos_query(
conn,
"create table if not exists log(ts timestamp, n int)",
)
taos_free_result(res)
res = taos_query(conn, "select count(*) from log interval(5s)")
cc = taos_num_fields(res)
assert cc == 2
stream = taos_open_stream(conn, "select count(*) from log interval(5s)", stream_callback, 0, None, None)
print("waiting for data")
time.sleep(1)
for i in range(0, 2):
res = taos_query(conn, "insert into log values(now,0)(now+1s, 1)(now + 2s, 2)")
taos_free_result(res)
time.sleep(2)
taos_close_stream(stream)
taos_query(conn, "drop database if exists " + dbname)
taos_close(conn)
except Exception as err:
taos_query(conn, "drop database if exists " + dbname)
raise err
from taos.cinterface import *
from taos import *
import pytest
@pytest.fixture
def conn():
return connect()
def test_client_info():
print(taos_get_client_info())
None
def test_server_info(conn):
# type: (TaosConnection) -> None
print(conn.client_info)
print(conn.server_info)
None
if __name__ == "__main__":
test_client_info()
test_server_info(connect())
from taos.error import OperationalError, SchemalessError
from taos import connect, new_bind_params, PrecisionEnum
from taos import *
from ctypes import *
import taos
import pytest
@pytest.fixture
def conn():
# type: () -> taos.TaosConnection
return connect()
def test_schemaless_insert_update_2(conn):
# type: (TaosConnection) -> None
dbname = "test_schemaless_insert_update_2"
try:
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s update 2 precision 'ns'" % dbname)
conn.select_db(dbname)
lines = [
'st,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin, abc",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000',
]
res = conn.schemaless_insert(lines, 1, 0)
print("affected rows: ", res)
assert(res == 1)
result = conn.query("select * from st")
[before] = result.fetch_all_into_dict()
assert(before["c3"] == "passitagin, abc")
lines = [
'st,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000',
]
res = conn.schemaless_insert(lines, 1, 0)
result = conn.query("select * from st")
[after] = result.fetch_all_into_dict()
assert(after["c3"] == "passitagin")
conn.execute("drop database if exists %s" % dbname)
conn.close()
except Exception as err:
conn.execute("drop database if exists %s" % dbname)
conn.close()
print(err)
raise err
def test_schemaless_insert(conn):
# type: (TaosConnection) -> None
dbname = "pytest_taos_schemaless_insert"
try:
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s update 2 precision 'ns'" % dbname)
conn.select_db(dbname)
lines = [
'st,t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"passit",c2=false,c4=4f64 1626006833639000000',
'st,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin, abc",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000',
'stf,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000',
]
res = conn.schemaless_insert(lines, 1, 0)
print("affected rows: ", res)
assert(res == 3)
lines = [
'stf,t1=5i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000',
]
res = conn.schemaless_insert(lines, 1, 0)
print("affected rows: ", res)
assert(res == 1)
result = conn.query("select * from st")
dict2 = result.fetch_all_into_dict()
print(dict2)
result.row_count
all = result.rows_iter()
for row in all:
print(row)
result.close()
assert(result.row_count == 2)
# error test
lines = [
',t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"passit",c2=false,c4=4f64 1626006833639000000',
]
try:
res = conn.schemaless_insert(lines, 1, 0)
print(res)
# assert(False)
except SchemalessError as err:
pass
result = conn.query("select * from st")
result.row_count
all = result.rows_iter()
for row in all:
print(row)
result.close()
conn.execute("drop database if exists %s" % dbname)
conn.close()
except Exception as err:
conn.execute("drop database if exists %s" % dbname)
conn.close()
print(err)
raise err
if __name__ == "__main__":
test_schemaless_insert(connect())
test_schemaless_insert_update_2(connect())
from datetime import datetime
import taos
import pytest
@pytest.fixture
def conn():
return taos.connect()
def test_query(conn):
"""This test will use fetch_block for rows fetching, significantly faster than rows_iter"""
result = conn.query("select * from log.log limit 10000")
fields = result.fields
for field in fields:
print("field: %s" % field)
start = datetime.now()
for row in result:
# print(row)
None
end = datetime.now()
elapsed = end - start
print("elapsed time: ", elapsed)
result.close()
conn.close()
def test_query_row_iter(conn):
"""This test will use fetch_row for each row fetching, this is the only way in async callback"""
result = conn.query("select * from log.log limit 10000")
fields = result.fields
for field in fields:
print("field: %s" % field)
start = datetime.now()
for row in result.rows_iter():
# print(row)
None
end = datetime.now()
elapsed = end - start
print("elapsed time: ", elapsed)
result.close()
conn.close()
if __name__ == "__main__":
test_query(taos.connect(database = "log"))
test_query_row_iter(taos.connect(database = "log"))
from taos import *
from ctypes import *
import taos
import pytest
import time
@pytest.fixture
def conn():
return taos.connect()
def fetch_callback(p_param, p_result, num_of_rows):
print("fetched ", num_of_rows, "rows")
p = cast(p_param, POINTER(Counter))
result = TaosResult(p_result)
if num_of_rows == 0:
print("fetching completed")
p.contents.done = True
result.close()
return
if num_of_rows < 0:
p.contents.done = True
result.check_error(num_of_rows)
result.close()
return None
for row in result.rows_iter(num_of_rows):
# print(row)
None
p.contents.count += result.row_count
result.fetch_rows_a(fetch_callback, p_param)
def query_callback(p_param, p_result, code):
# type: (c_void_p, c_void_p, c_int) -> None
if p_result == None:
return
result = TaosResult(p_result)
if code == 0:
result.fetch_rows_a(fetch_callback, p_param)
result.check_error(code)
class Counter(Structure):
_fields_ = [("count", c_int), ("done", c_bool)]
def __str__(self):
return "{ count: %d, done: %s }" % (self.count, self.done)
def test_query(conn):
# type: (TaosConnection) -> None
counter = Counter(count=0)
conn.query_a("select * from log.log", query_callback, byref(counter))
while not counter.done:
print("wait query callback")
time.sleep(1)
print(counter)
conn.close()
if __name__ == "__main__":
test_query(taos.connect())
# encoding:UTF-8
from taos import *
from ctypes import *
from datetime import datetime
import taos
import pytest
@pytest.fixture
def conn():
# type: () -> taos.TaosConnection
return connect()
def test_stmt_insert(conn):
# type: (TaosConnection) -> None
dbname = "pytest_taos_stmt"
try:
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.execute(
"create table if not exists log(ts timestamp, bo bool, nil tinyint, ti tinyint, si smallint, ii int,\
bi bigint, tu tinyint unsigned, su smallint unsigned, iu int unsigned, bu bigint unsigned, \
ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)",
)
conn.load_table_info("log")
stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
params = new_bind_params(16)
params[0].timestamp(1626861392589, PrecisionEnum.Milliseconds)
params[1].bool(True)
params[2].null()
params[3].tinyint(2)
params[4].smallint(3)
params[5].int(4)
params[6].bigint(5)
params[7].tinyint_unsigned(6)
params[8].smallint_unsigned(7)
params[9].int_unsigned(8)
params[10].bigint_unsigned(9)
params[11].float(10.1)
params[12].double(10.11)
params[13].binary("hello")
params[14].nchar("stmt")
params[15].timestamp(1626861392589, PrecisionEnum.Milliseconds)
stmt.bind_param(params)
stmt.execute()
result = stmt.use_result()
assert result.affected_rows == 1
result.close()
stmt.close()
stmt = conn.statement("select * from log")
stmt.execute()
result = stmt.use_result()
row = result.next()
print(row)
assert row[2] == None
for i in range(3, 11):
assert row[i] == i - 1
#float == may not work as expected
# assert row[10] == c_float(10.1)
assert row[12] == 10.11
assert row[13] == "hello"
assert row[14] == "stmt"
conn.execute("drop database if exists %s" % dbname)
conn.close()
except Exception as err:
conn.execute("drop database if exists %s" % dbname)
conn.close()
raise err
def test_stmt_insert_multi(conn):
# type: (TaosConnection) -> None
dbname = "pytest_taos_stmt_multi"
try:
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.execute(
"create table if not exists log(ts timestamp, bo bool, nil tinyint, ti tinyint, si smallint, ii int,\
bi bigint, tu tinyint unsigned, su smallint unsigned, iu int unsigned, bu bigint unsigned, \
ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)",
)
conn.load_table_info("log")
start = datetime.now()
stmt = conn.statement("insert into log values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
params = new_multi_binds(16)
params[0].timestamp((1626861392589, 1626861392590, 1626861392591))
params[1].bool((True, None, False))
params[2].tinyint([-128, -128, None]) # -128 is tinyint null
params[3].tinyint([0, 127, None])
params[4].smallint([3, None, 2])
params[5].int([3, 4, None])
params[6].bigint([3, 4, None])
params[7].tinyint_unsigned([3, 4, None])
params[8].smallint_unsigned([3, 4, None])
params[9].int_unsigned([3, 4, None])
params[10].bigint_unsigned([3, 4, None])
params[11].float([3, None, 1])
params[12].double([3, None, 1.2])
params[13].binary(["abc", "dddafadfadfadfadfa", None])
params[14].nchar(["涛思数据", None, "a long string with 中文字符"])
params[15].timestamp([None, None, 1626861392591])
stmt.bind_param_batch(params)
stmt.execute()
end = datetime.now()
print("elapsed time: ", end - start)
result = stmt.use_result()
assert result.affected_rows == 3
result.close()
stmt.close()
stmt = conn.statement("select * from log")
stmt.execute()
result = stmt.use_result()
for row in result:
print(row)
result.close()
stmt.close()
# start = datetime.now()
# conn.query("insert into log values(1626861392660, true, NULL, 0, 3,3,3,3,3,3,3,3.0,3.0, 'abc','涛思数据',NULL)(1626861392661, true, NULL, 0, 3,3,3,3,3,3,3,3.0,3.0, 'abc','涛思数据',NULL)(1626861392662, true, NULL, 0, 3,3,3,3,3,3,3,3.0,3.0, 'abc','涛思数据',NULL)")
# end = datetime.now()
# print("elapsed time: ", end - start)
conn.execute("drop database if exists %s" % dbname)
conn.close()
except Exception as err:
conn.execute("drop database if exists %s" % dbname)
conn.close()
raise err
if __name__ == "__main__":
test_stmt_insert(connect())
test_stmt_insert_multi(connect())
\ No newline at end of file
from taos.cinterface import *
from taos.precision import *
from taos.bind import *
from taos import *
from ctypes import *
import time
import pytest
@pytest.fixture
def conn():
return connect()
def stream_callback(p_param, p_result, p_row):
# type: (c_void_p, c_void_p, c_void_p) -> None
if p_result == None or p_row == None:
return
result = TaosResult(p_result)
row = TaosRow(result, p_row)
try:
ts, count = row.as_tuple()
print(ts, count)
p = cast(p_param, POINTER(Counter))
p.contents.count += count
print("[%s] inserted %d in 5s, total count: %d" % (ts.strftime("%Y-%m-%d %H:%M:%S"), count, p.contents.count))
except Exception as err:
print(err)
raise err
class Counter(ctypes.Structure):
_fields_ = [
("count", c_int),
]
def __str__(self):
return "%d" % self.count
def test_stream(conn):
# type: (TaosConnection) -> None
dbname = "pytest_taos_stream"
try:
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.execute("create table if not exists log(ts timestamp, n int)")
result = conn.query("select count(*) from log interval(5s)")
assert result.field_count == 2
counter = Counter()
counter.count = 0
stream = conn.stream("select count(*) from log interval(5s)", stream_callback, param=byref(counter))
for _ in range(0, 20):
conn.execute("insert into log values(now,0)(now+1s, 1)(now + 2s, 2)")
time.sleep(2)
stream.close()
conn.execute("drop database if exists %s" % dbname)
conn.close()
except Exception as err:
conn.execute("drop database if exists %s" % dbname)
conn.close()
raise err
if __name__ == "__main__":
test_stream(connect())
from taos.subscription import TaosSubscription
from taos import *
from ctypes import *
import taos
import pytest
import time
from random import random
@pytest.fixture
def conn():
return taos.connect()
def test_subscribe(conn):
# type: (TaosConnection) -> None
dbname = "pytest_taos_subscribe_callback"
try:
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.execute("create table if not exists log(ts timestamp, n int)")
for i in range(10):
conn.execute("insert into log values(now, %d)" % i)
sub = conn.subscribe(True, "test", "select * from log", 1000)
print("# consume from begin")
for ts, n in sub.consume():
print(ts, n)
print("# consume new data")
for i in range(5):
conn.execute("insert into log values(now, %d)(now+1s, %d)" % (i, i))
result = sub.consume()
for ts, n in result:
print(ts, n)
print("# consume with a stop condition")
for i in range(10):
conn.execute("insert into log values(now, %d)" % int(random() * 10))
result = sub.consume()
try:
ts, n = next(result)
print(ts, n)
if n > 5:
result.stop_query()
print("## stopped")
break
except StopIteration:
continue
sub.close()
conn.execute("drop database if exists %s" % dbname)
conn.close()
except Exception as err:
conn.execute("drop database if exists %s" % dbname)
conn.close()
raise err
def subscribe_callback(p_sub, p_result, p_param, errno):
# type: (c_void_p, c_void_p, c_void_p, c_int) -> None
print("callback")
result = TaosResult(c_void_p(p_result))
result.check_error(errno)
for row in result.rows_iter():
ts, n = row()
print(ts, n)
def test_subscribe_callback(conn):
# type: (TaosConnection) -> None
dbname = "pytest_taos_subscribe_callback"
try:
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s" % dbname)
conn.execute("use %s" % dbname)
conn.execute("create table if not exists log(ts timestamp, n int)")
print("# subscribe with callback")
sub = conn.subscribe(False, "test", "select * from log", 1000, subscribe_callback)
for i in range(10):
conn.execute("insert into log values(now, %d)" % i)
time.sleep(0.7)
sub.close()
conn.execute("drop database if exists %s" % dbname)
conn.close()
except Exception as err:
conn.execute("drop database if exists %s" % dbname)
conn.close()
raise err
if __name__ == "__main__":
test_subscribe(taos.connect())
test_subscribe_callback(taos.connect())
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册