diff --git a/Jenkinsfile2 b/Jenkinsfile2 index d93ed9df5b70ce791c444005316bc279ef82666f..662cdb701166ad07ea6f2129dd5be94167b77023 100644 --- a/Jenkinsfile2 +++ b/Jenkinsfile2 @@ -21,21 +21,25 @@ def sync_source() { if (env.CHANGE_TARGET == 'master') { sh ''' cd ${WKC} + git clean -fxd git checkout master ''' } else if (env.CHANGE_TARGET == '2.0') { sh ''' cd ${WKC} + git clean -fxd git checkout 2.0 ''' } else if (env.CHANGE_TARGET == '2.4') { sh ''' cd ${WKC} + git clean -fxd git checkout 2.4 ''' } else { sh ''' cd ${WKC} + git clean -fxd git checkout develop ''' } @@ -83,6 +87,16 @@ def sync_source() { cd ${WKC} git fetch origin +refs/pull/${CHANGE_ID}/merge git checkout -qf FETCH_HEAD + + if [ ! -d src/connector/python/.github ]; then + rm -rf src/connector/python/* || : + rm -rf src/connector/python/.* || : + git clone --depth 1 https://github.com/taosdata/taos-connector-python src/connector/python || echo "failed to clone python connector" + else + cd src/connector/python || echo "src/connector/python not exist" + git pull || : + cd ${WK} + fi ''' } else if (env.CHANGE_URL =~ /\/TDinternal\//) { sh ''' @@ -90,6 +104,16 @@ def sync_source() { cd ${WK} git fetch origin +refs/pull/${CHANGE_ID}/merge git checkout -qf FETCH_HEAD + + if [ ! -d community/src/connector/python/.github ]; then + rm -rf community/src/connector/python/* || : + rm -rf community/src/connector/python/.* || : + git clone --depth 1 https://github.com/taosdata/taos-connector-python community/src/connector/python || echo "failed to clone python connector" + else + cd community/src/connector/python || echo "community/src/connector/python not exist" + git pull || : + cd ${WK} + fi ''' } else { sh ''' diff --git a/src/connector/python/LICENSE b/src/connector/python/LICENSE deleted file mode 100644 index 79a9d730868bfe5d3fa01d679a4abfe9ee7811f0..0000000000000000000000000000000000000000 --- a/src/connector/python/LICENSE +++ /dev/null @@ -1,12 +0,0 @@ - Copyright (c) 2019 TAOS Data, Inc. - -This program is free software: you can use, redistribute, and/or modify -it under the terms of the GNU Affero General Public License, version 3 -or later ("AGPL"), as published by the Free Software Foundation. - -This program is distributed in the hope that it will be useful, but WITHOUT -ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or -FITNESS FOR A PARTICULAR PURPOSE. - -You should have received a copy of the GNU Affero General Public License -along with this program. If not, see . diff --git a/src/connector/python/README.md b/src/connector/python/README.md index e4fce45a59bea74ea64a5da082767579459d4196..c48a2dac4ba52324f761e29ea4fc72d186a30309 100644 --- a/src/connector/python/README.md +++ b/src/connector/python/README.md @@ -17,12 +17,6 @@ Or with git url: pip install git+https://github.com/taosdata/taos-connector-python.git ``` -If you have installed TDengine server or client with prebuilt packages, then you can install the connector from path: - -```bash -pip install /usr/local/taos/connector/python -``` - ## Source Code [TDengine](https://github.com/taosdata/TDengine) connector for Python source code is hosted on [GitHub](https://github.com/taosdata/taos-connector-python). @@ -361,71 +355,6 @@ if __name__ == "__main__": test_subscribe_callback(connect()) ``` -### Stream - -```python -from taos import * -from ctypes import * -import time - -def stream_callback(p_param, p_result, p_row): - # type: (c_void_p, c_void_p, c_void_p) -> None - if p_result is None or p_row is None: - return - result = TaosResult(p_result) - row = TaosRow(result, p_row) - try: - ts, count = row() - p = cast(p_param, POINTER(Counter)) - p.contents.count += count - print("[%s] inserted %d in 5s, total count: %d" % (ts.strftime("%Y-%m-%d %H:%M:%S"), count, p.contents.count)) - except Exception as err: - print(err) - raise err - - -class Counter(Structure): - _fields_ = [ - ("count", c_int), - ] - - def __str__(self): - return "%d" % self.count - - -def test_stream(conn): - # type: (TaosConnection) -> None - dbname = "pytest_taos_stream" - try: - conn.execute("drop database if exists %s" % dbname) - conn.execute("create database if not exists %s" % dbname) - conn.select_db(dbname) - conn.execute("create table if not exists log(ts timestamp, n int)") - - result = conn.query("select count(*) from log interval(5s)") - assert result.field_count == 2 - counter = Counter() - counter.count = 0 - stream = conn.stream("select count(*) from log interval(5s)", stream_callback, param=byref(counter)) - - for _ in range(0, 20): - conn.execute("insert into log values(now,0)(now+1s, 1)(now + 2s, 2)") - time.sleep(2) - - stream.close() - conn.execute("drop database if exists %s" % dbname) - conn.close() - except Exception as err: - conn.execute("drop database if exists %s" % dbname) - conn.close() - raise err - - -if __name__ == "__main__": - test_stream(connect()) - -``` - ### Insert with line protocol ```python diff --git a/src/connector/python/examples/bind-multi.py b/src/connector/python/examples/bind-multi.py deleted file mode 100644 index 8530253aef58079e01f5eb71d8e12ab1649b7731..0000000000000000000000000000000000000000 --- a/src/connector/python/examples/bind-multi.py +++ /dev/null @@ -1,50 +0,0 @@ -# encoding:UTF-8 -from taos import * - -conn = connect() - -dbname = "pytest_taos_stmt_multi" -conn.execute("drop database if exists %s" % dbname) -conn.execute("create database if not exists %s" % dbname) -conn.select_db(dbname) - -conn.execute( - "create table if not exists log(ts timestamp, bo bool, nil tinyint, \ - ti tinyint, si smallint, ii int, bi bigint, tu tinyint unsigned, \ - su smallint unsigned, iu int unsigned, bu bigint unsigned, \ - ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)", -) - -stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)") - -params = new_multi_binds(16) -params[0].timestamp((1626861392589, 1626861392590, 1626861392591)) -params[1].bool((True, None, False)) -params[2].tinyint([-128, -128, None]) # -128 is tinyint null -params[3].tinyint([0, 127, None]) -params[4].smallint([3, None, 2]) -params[5].int([3, 4, None]) -params[6].bigint([3, 4, None]) -params[7].tinyint_unsigned([3, 4, None]) -params[8].smallint_unsigned([3, 4, None]) -params[9].int_unsigned([3, 4, None]) -params[10].bigint_unsigned([3, 4, None]) -params[11].float([3, None, 1]) -params[12].double([3, None, 1.2]) -params[13].binary(["abc", "dddafadfadfadfadfa", None]) -params[14].nchar(["涛思数据", None, "a long string with 中文字符"]) -params[15].timestamp([None, None, 1626861392591]) -stmt.bind_param_batch(params) -stmt.execute() - - -result = stmt.use_result() -assert result.affected_rows == 3 -result.close() - -result = conn.query("select * from log") -for row in result: - print(row) -result.close() -stmt.close() -conn.close() \ No newline at end of file diff --git a/src/connector/python/examples/bind-row.py b/src/connector/python/examples/bind-row.py deleted file mode 100644 index 4ab9a9167ad23a6167c6586aac30ae6941dcee6d..0000000000000000000000000000000000000000 --- a/src/connector/python/examples/bind-row.py +++ /dev/null @@ -1,57 +0,0 @@ -from taos import * - -conn = connect() - -dbname = "pytest_taos_stmt" -conn.execute("drop database if exists %s" % dbname) -conn.execute("create database if not exists %s" % dbname) -conn.select_db(dbname) - -conn.execute( - "create table if not exists log(ts timestamp, bo bool, nil tinyint, \ - ti tinyint, si smallint, ii int, bi bigint, tu tinyint unsigned, \ - su smallint unsigned, iu int unsigned, bu bigint unsigned, \ - ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)", -) - -stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)") - -params = new_bind_params(16) -params[0].timestamp(1626861392589) -params[1].bool(True) -params[2].null() -params[3].tinyint(2) -params[4].smallint(3) -params[5].int(4) -params[6].bigint(5) -params[7].tinyint_unsigned(6) -params[8].smallint_unsigned(7) -params[9].int_unsigned(8) -params[10].bigint_unsigned(9) -params[11].float(10.1) -params[12].double(10.11) -params[13].binary("hello") -params[14].nchar("stmt") -params[15].timestamp(1626861392589) -stmt.bind_param(params) - -params[0].timestamp(1626861392590) -params[15].null() -stmt.bind_param(params) -stmt.execute() - - -result = stmt.use_result() -assert result.affected_rows == 2 -# No need to explicitly close, but ok for you -# result.close() - -result = conn.query("select * from log") - -for row in result: - print(row) - -# No need to explicitly close, but ok for you -# result.close() -# stmt.close() -# conn.close() diff --git a/src/connector/python/examples/demo.py b/src/connector/python/examples/demo.py deleted file mode 100644 index 3bc09046f3a33557e513425c06373c66958f2a2f..0000000000000000000000000000000000000000 --- a/src/connector/python/examples/demo.py +++ /dev/null @@ -1,12 +0,0 @@ -import taos - -conn = taos.connect(host='127.0.0.1', - user='root', - password='taosdata', - database='log') -cursor = conn.cursor() - -sql = "select * from log.log limit 10" -cursor.execute(sql) -for row in cursor: - print(row) diff --git a/src/connector/python/examples/insert-lines.py b/src/connector/python/examples/insert-lines.py deleted file mode 100644 index 1d20af7e9bcac23deb70c1dbd058bb86dd5585a5..0000000000000000000000000000000000000000 --- a/src/connector/python/examples/insert-lines.py +++ /dev/null @@ -1,23 +0,0 @@ -import taos -from taos import SmlProtocol, SmlPrecision - -conn = taos.connect() -dbname = "pytest_line" -conn.execute("drop database if exists %s" % dbname) -conn.execute("create database if not exists %s precision 'us'" % dbname) -conn.select_db(dbname) - -lines = [ - 'st,t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"pass",c2=false,c4=4f64 1626006833639000000', -] -conn.schemaless_insert(lines, taos.SmlProtocol.LINE_PROTOCOL, taos.SmlPrecision.NOT_CONFIGURED) -print("inserted") - -conn.schemaless_insert(lines, taos.SmlProtocol.LINE_PROTOCOL, taos.SmlPrecision.NOT_CONFIGURED) - -result = conn.query("show tables") -for row in result: - print(row) - - -conn.execute("drop database if exists %s" % dbname) diff --git a/src/connector/python/examples/pep-249.py b/src/connector/python/examples/pep-249.py deleted file mode 100644 index 971a3c401f00b982096b8d429f65bce73cca4760..0000000000000000000000000000000000000000 --- a/src/connector/python/examples/pep-249.py +++ /dev/null @@ -1,9 +0,0 @@ -import taos - -conn = taos.connect() -cursor = conn.cursor() - -cursor.execute("show databases") -results = cursor.fetchall() -for row in results: - print(row) diff --git a/src/connector/python/examples/query-async.py b/src/connector/python/examples/query-async.py deleted file mode 100644 index 585db2344eda4c5d38c2868c35f4d91c50926880..0000000000000000000000000000000000000000 --- a/src/connector/python/examples/query-async.py +++ /dev/null @@ -1,62 +0,0 @@ -from taos import * -from ctypes import * -import time - -def fetch_callback(p_param, p_result, num_of_rows): - print("fetched ", num_of_rows, "rows") - p = cast(p_param, POINTER(Counter)) - result = TaosResult(p_result) - - if num_of_rows == 0: - print("fetching completed") - p.contents.done = True - # should explicitly close the result in fetch completed or cause error - result.close() - return - if num_of_rows < 0: - p.contents.done = True - result.check_error(num_of_rows) - result.close() - return None - - for row in result.rows_iter(num_of_rows): - # print(row) - None - p.contents.count += result.row_count - result.fetch_rows_a(fetch_callback, p_param) - - - -def query_callback(p_param, p_result, code): - # type: (c_void_p, c_void_p, c_int) -> None - if p_result is None: - return - result = TaosResult(p_result) - if code == 0: - result.fetch_rows_a(fetch_callback, p_param) - result.check_error(code) - # explicitly close result while query failed - result.close() - - -class Counter(Structure): - _fields_ = [("count", c_int), ("done", c_bool)] - - def __str__(self): - return "{ count: %d, done: %s }" % (self.count, self.done) - - -def test_query(conn): - # type: (TaosConnection) -> None - counter = Counter(count=0) - conn.query_a("select * from log.log", query_callback, byref(counter)) - - while not counter.done: - print("wait query callback") - time.sleep(1) - print(counter) - # conn.close() - - -if __name__ == "__main__": - test_query(connect()) \ No newline at end of file diff --git a/src/connector/python/examples/query-objectively.py b/src/connector/python/examples/query-objectively.py deleted file mode 100644 index 104347cbf91e29e62fef26477b475053a8b8bc3e..0000000000000000000000000000000000000000 --- a/src/connector/python/examples/query-objectively.py +++ /dev/null @@ -1,12 +0,0 @@ -import taos - -conn = taos.connect() -conn.execute("create database if not exists pytest") - -result = conn.query("show databases") -num_of_fields = result.field_count -for field in result.fields: - print(field) -for row in result: - print(row) -conn.execute("drop database pytest") diff --git a/src/connector/python/examples/stream.py b/src/connector/python/examples/stream.py deleted file mode 100644 index 73cbd03c493f4441d661f924bf648bc8992aeb0a..0000000000000000000000000000000000000000 --- a/src/connector/python/examples/stream.py +++ /dev/null @@ -1,59 +0,0 @@ -from taos import * -from ctypes import * -import time - -def stream_callback(p_param, p_result, p_row): - # type: (c_void_p, c_void_p, c_void_p) -> None - if p_result is None or p_row is None: - return - result = TaosResult(p_result) - row = TaosRow(result, p_row) - try: - ts, count = row() - p = cast(p_param, POINTER(Counter)) - p.contents.count += count - print("[%s] inserted %d in 5s, total count: %d" % (ts.strftime("%Y-%m-%d %H:%M:%S"), count, p.contents.count)) - except Exception as err: - print(err) - raise err - - -class Counter(Structure): - _fields_ = [ - ("count", c_int), - ] - - def __str__(self): - return "%d" % self.count - - -def test_stream(conn): - # type: (TaosConnection) -> None - dbname = "pytest_taos_stream" - try: - conn.execute("drop database if exists %s" % dbname) - conn.execute("create database if not exists %s" % dbname) - conn.select_db(dbname) - conn.execute("create table if not exists log(ts timestamp, n int)") - - result = conn.query("select count(*) from log interval(5s)") - assert result.field_count == 2 - counter = Counter() - counter.count = 0 - stream = conn.stream("select count(*) from log interval(5s)", stream_callback, param=byref(counter)) - - for _ in range(0, 20): - conn.execute("insert into log values(now,0)(now+1s, 1)(now + 2s, 2)") - time.sleep(2) - - stream.close() - conn.execute("drop database if exists %s" % dbname) - conn.close() - except Exception as err: - conn.execute("drop database if exists %s" % dbname) - conn.close() - raise err - - -if __name__ == "__main__": - test_stream(connect()) diff --git a/src/connector/python/examples/subscribe-async.py b/src/connector/python/examples/subscribe-async.py deleted file mode 100644 index 49156de7edfb4322d7888727c28b76868cf6a16a..0000000000000000000000000000000000000000 --- a/src/connector/python/examples/subscribe-async.py +++ /dev/null @@ -1,46 +0,0 @@ -from taos import * -from ctypes import * - -import time - - -def subscribe_callback(p_sub, p_result, p_param, errno): - # type: (c_void_p, c_void_p, c_void_p, c_int) -> None - print("# fetch in callback") - result = TaosResult(c_void_p(p_result)) - result.check_error(errno) - for row in result.rows_iter(): - ts, n = row() - print(ts, n) - - -def test_subscribe_callback(conn): - # type: (TaosConnection) -> None - dbname = "pytest_taos_subscribe_callback" - try: - print("drop if exists") - conn.execute("drop database if exists %s" % dbname) - print("create database") - conn.execute("create database if not exists %s" % dbname) - print("create table") - # conn.execute("use %s" % dbname) - conn.execute("create table if not exists %s.log(ts timestamp, n int)" % dbname) - - print("# subscribe with callback") - sub = conn.subscribe(False, "test", "select * from %s.log" % dbname, 1000, subscribe_callback) - - for i in range(10): - conn.execute("insert into %s.log values(now, %d)" % (dbname, i)) - time.sleep(0.7) - sub.close() - - conn.execute("drop database if exists %s" % dbname) - # conn.close() - except Exception as err: - conn.execute("drop database if exists %s" % dbname) - # conn.close() - raise err - - -if __name__ == "__main__": - test_subscribe_callback(connect()) diff --git a/src/connector/python/examples/subscribe-sync.py b/src/connector/python/examples/subscribe-sync.py deleted file mode 100644 index 3a7f65f460280924ed3a577fe55b975fbf12c1a3..0000000000000000000000000000000000000000 --- a/src/connector/python/examples/subscribe-sync.py +++ /dev/null @@ -1,53 +0,0 @@ -import taos -import random - -conn = taos.connect() -dbname = "pytest_taos_subscribe" -conn.execute("drop database if exists %s" % dbname) -conn.execute("create database if not exists %s" % dbname) -conn.select_db(dbname) -conn.execute("create table if not exists log(ts timestamp, n int)") -for i in range(10): - conn.execute("insert into log values(now, %d)" % i) - -sub = conn.subscribe(False, "test", "select * from log", 1000) -print("# consume from begin") -for ts, n in sub.consume(): - print(ts, n) - -print("# consume new data") -for i in range(5): - conn.execute("insert into log values(now, %d)(now+1s, %d)" % (i, i)) - result = sub.consume() - for ts, n in result: - print(ts, n) - -sub.close(True) -print("# keep progress consume") -sub = conn.subscribe(False, "test", "select * from log", 1000) -result = sub.consume() -rows = result.fetch_all() -# consume from latest subscription needs root privilege(for /var/lib/taos). -assert result.row_count == 0 -print("## consumed ", len(rows), "rows") - -print("# consume with a stop condition") -for i in range(10): - conn.execute("insert into log values(now, %d)" % random.randint(0, 10)) - result = sub.consume() - try: - ts, n = next(result) - print(ts, n) - if n > 5: - result.stop_query() - print("## stopped") - break - except StopIteration: - continue - -sub.close() - -# sub.close() - -conn.execute("drop database if exists %s" % dbname) -# conn.close() diff --git a/src/connector/python/pyproject.toml b/src/connector/python/pyproject.toml deleted file mode 100644 index 69e3351712b647712a88d7067545ea12ed86506d..0000000000000000000000000000000000000000 --- a/src/connector/python/pyproject.toml +++ /dev/null @@ -1,30 +0,0 @@ -[tool.poetry] -name = "taospy" -version = "2.1.2" -description = "TDengine connector for python" -authors = ["Taosdata Inc. "] -license = "AGPL-3.0" -readme = "README.md" -packages = [ - {include = "taos"} -] - -[tool.poetry.dependencies] -python = "^2.7 || ^3.4" -typing = "*" - -[tool.poetry.dev-dependencies] -pytest = [ - { version = "^4.6", python = ">=2.7,<3.0" }, - { version = "^6.2", python = ">=3.7,<4.0" } -] -pdoc = { version = "^7.1.1", python = "^3.7" } -mypy = { version = "^0.910", python = "^3.6" } -black = [{ version = "^21.*", python = ">=3.6.2,<4.0" }] - -[build-system] -requires = ["poetry-core>=1.0.0"] -build-backend = "poetry.core.masonry.api" - -[tool.black] -line-length = 119 diff --git a/src/connector/python/setup.py b/src/connector/python/setup.py deleted file mode 100644 index 8f1dfafe4762e4a55a6d3e7c645c945a67a10f68..0000000000000000000000000000000000000000 --- a/src/connector/python/setup.py +++ /dev/null @@ -1,34 +0,0 @@ -import setuptools - -with open("README.md", "r") as fh: - long_description = fh.read() - -setuptools.setup( - name="taos", - version="2.1.1", - author="Taosdata Inc.", - author_email="support@taosdata.com", - description="TDengine python client package", - long_description=long_description, - long_description_content_type="text/markdown", - url="https://github.com/taosdata/TDengine/tree/develop/src/connector/python", - packages=setuptools.find_packages(), - classifiers=[ - "Environment :: Console", - "Environment :: MacOS X", - "Environment :: Win32 (MS Windows)", - "Programming Language :: Python", - "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.6", - "Programming Language :: Python :: 3.7", - "Programming Language :: Python :: 3.8", - "Programming Language :: Python :: 3.9", - "License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)", - "Operating System :: MacOS", - "Programming Language :: Python :: 2.7", - "Operating System :: Linux", - "Operating System :: POSIX :: Linux", - "Operating System :: Microsoft :: Windows", - "Operating System :: Microsoft :: Windows :: Windows 10", - ], -) diff --git a/src/connector/python/taos/__init__.py b/src/connector/python/taos/__init__.py deleted file mode 100644 index 739265ef579b6a5127df8ee592b73293f113a2ef..0000000000000000000000000000000000000000 --- a/src/connector/python/taos/__init__.py +++ /dev/null @@ -1,485 +0,0 @@ -# encoding:UTF-8 -""" -# TDengine Connector for Python - -[TDengine](https://github.com/taosdata/TDengine) connector for Python enables python programs to access TDengine, - using an API which is compliant with the Python DB API 2.0 (PEP-249). It uses TDengine C client library for client server communications. - -## Install - -```sh -git clone --depth 1 https://github.com/taosdata/TDengine.git -pip install ./TDengine/src/connector/python -``` - -## Source Code - -[TDengine](https://github.com/taosdata/TDengine) connector for Python source code is hosted on [GitHub](https://github.com/taosdata/TDengine/tree/develop/src/connector/python). - -## Examples - -### Query with PEP-249 API - -```python -import taos - -conn = taos.connect() -cursor = conn.cursor() - -cursor.execute("show databases") -results = cursor.fetchall() -for row in results: - print(row) -cursor.close() -conn.close() -``` - -### Query with objective API - -```python -import taos - -conn = taos.connect() -conn.exec("create database if not exists pytest") - -result = conn.query("show databases") -num_of_fields = result.field_count -for field in result.fields: - print(field) -for row in result: - print(row) -result.close() -conn.exec("drop database pytest") -conn.close() -``` - -### Query with async API - -```python -from taos import * -from ctypes import * -import time - -def fetch_callback(p_param, p_result, num_of_rows): - print("fetched ", num_of_rows, "rows") - p = cast(p_param, POINTER(Counter)) - result = TaosResult(p_result) - - if num_of_rows == 0: - print("fetching completed") - p.contents.done = True - result.close() - return - if num_of_rows < 0: - p.contents.done = True - result.check_error(num_of_rows) - result.close() - return None - - for row in result.rows_iter(num_of_rows): - # print(row) - None - p.contents.count += result.row_count - result.fetch_rows_a(fetch_callback, p_param) - - - -def query_callback(p_param, p_result, code): - # type: (c_void_p, c_void_p, c_int) -> None - if p_result is None: - return - result = TaosResult(p_result) - if code == 0: - result.fetch_rows_a(fetch_callback, p_param) - result.check_error(code) - - -class Counter(Structure): - _fields_ = [("count", c_int), ("done", c_bool)] - - def __str__(self): - return "{ count: %d, done: %s }" % (self.count, self.done) - - -def test_query(conn): - # type: (TaosConnection) -> None - counter = Counter(count=0) - conn.query_a("select * from log.log", query_callback, byref(counter)) - - while not counter.done: - print("wait query callback") - time.sleep(1) - print(counter) - conn.close() - - -if __name__ == "__main__": - test_query(connect()) -``` - -### Statement API - Bind row after row - -```python -from taos import * - -conn = connect() - -dbname = "pytest_taos_stmt" -conn.exec("drop database if exists %s" % dbname) -conn.exec("create database if not exists %s" % dbname) -conn.select_db(dbname) - -conn.exec( - "create table if not exists log(ts timestamp, bo bool, nil tinyint, \\ - ti tinyint, si smallint, ii int, bi bigint, tu tinyint unsigned, \\ - su smallint unsigned, iu int unsigned, bu bigint unsigned, \\ - ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)", -) - -stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)") - -params = new_bind_params(16) -params[0].timestamp(1626861392589) -params[1].bool(True) -params[2].null() -params[3].tinyint(2) -params[4].smallint(3) -params[5].int(4) -params[6].bigint(5) -params[7].tinyint_unsigned(6) -params[8].smallint_unsigned(7) -params[9].int_unsigned(8) -params[10].bigint_unsigned(9) -params[11].float(10.1) -params[12].double(10.11) -params[13].binary("hello") -params[14].nchar("stmt") -params[15].timestamp(1626861392589) -stmt.bind_param(params) - -params[0].timestamp(1626861392590) -params[15].null() -stmt.bind_param(params) -stmt.execute() - - -result = stmt.use_result() -assert result.affected_rows == 2 -result.close() - -result = conn.query("select * from log") - -for row in result: - print(row) -result.close() -stmt.close() -conn.close() - -``` - -### Statement API - Bind multi rows - -```python -from taos import * - -conn = connect() - -dbname = "pytest_taos_stmt" -conn.exec("drop database if exists %s" % dbname) -conn.exec("create database if not exists %s" % dbname) -conn.select_db(dbname) - -conn.exec( - "create table if not exists log(ts timestamp, bo bool, nil tinyint, \\ - ti tinyint, si smallint, ii int, bi bigint, tu tinyint unsigned, \\ - su smallint unsigned, iu int unsigned, bu bigint unsigned, \\ - ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)", -) - -stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)") - -params = new_multi_binds(16) -params[0].timestamp((1626861392589, 1626861392590, 1626861392591)) -params[1].bool((True, None, False)) -params[2].tinyint([-128, -128, None]) # -128 is tinyint null -params[3].tinyint([0, 127, None]) -params[4].smallint([3, None, 2]) -params[5].int([3, 4, None]) -params[6].bigint([3, 4, None]) -params[7].tinyint_unsigned([3, 4, None]) -params[8].smallint_unsigned([3, 4, None]) -params[9].int_unsigned([3, 4, None]) -params[10].bigint_unsigned([3, 4, None]) -params[11].float([3, None, 1]) -params[12].double([3, None, 1.2]) -params[13].binary(["abc", "dddafadfadfadfadfa", None]) -params[14].nchar(["涛思数据", None, "a long string with 中文字符"]) -params[15].timestamp([None, None, 1626861392591]) -stmt.bind_param_batch(params) -stmt.execute() - - -result = stmt.use_result() -assert result.affected_rows == 3 -result.close() - -result = conn.query("select * from log") -for row in result: - print(row) -result.close() -stmt.close() -conn.close() -``` - -### Statement API - Subscribe - -```python -import taos - -conn = taos.connect() -dbname = "pytest_taos_subscribe_callback" -conn.exec("drop database if exists %s" % dbname) -conn.exec("create database if not exists %s" % dbname) -conn.select_db(dbname) -conn.exec("create table if not exists log(ts timestamp, n int)") -for i in range(10): - conn.exec("insert into log values(now, %d)" % i) - -sub = conn.subscribe(True, "test", "select * from log", 1000) -print("# consume from begin") -for ts, n in sub.consume(): - print(ts, n) - -print("# consume new data") -for i in range(5): - conn.exec("insert into log values(now, %d)(now+1s, %d)" % (i, i)) - result = sub.consume() - for ts, n in result: - print(ts, n) - -print("# consume with a stop condition") -for i in range(10): - conn.exec("insert into log values(now, %d)" % int(random() * 10)) - result = sub.consume() - try: - ts, n = next(result) - print(ts, n) - if n > 5: - result.stop_query() - print("## stopped") - break - except StopIteration: - continue - -sub.close() - -conn.exec("drop database if exists %s" % dbname) -conn.close() -``` - -### Statement API - Subscribe asynchronously with callback - -```python -from taos import * -from ctypes import * - -import time - - -def subscribe_callback(p_sub, p_result, p_param, errno): - # type: (c_void_p, c_void_p, c_void_p, c_int) -> None - print("# fetch in callback") - result = TaosResult(p_result) - result.check_error(errno) - for row in result.rows_iter(): - ts, n = row() - print(ts, n) - - -def test_subscribe_callback(conn): - # type: (TaosConnection) -> None - dbname = "pytest_taos_subscribe_callback" - try: - conn.exec("drop database if exists %s" % dbname) - conn.exec("create database if not exists %s" % dbname) - conn.select_db(dbname) - conn.exec("create table if not exists log(ts timestamp, n int)") - - print("# subscribe with callback") - sub = conn.subscribe(False, "test", "select * from log", 1000, subscribe_callback) - - for i in range(10): - conn.exec("insert into log values(now, %d)" % i) - time.sleep(0.7) - sub.close() - - conn.exec("drop database if exists %s" % dbname) - conn.close() - except Exception as err: - conn.exec("drop database if exists %s" % dbname) - conn.close() - raise err - - -if __name__ == "__main__": - test_subscribe_callback(connect()) - -``` - -### Statement API - Stream - -```python -from taos import * -from ctypes import * - -def stream_callback(p_param, p_result, p_row): - # type: (c_void_p, c_void_p, c_void_p) -> None - - if p_result is None or p_row is None: - return - result = TaosResult(p_result) - row = TaosRow(result, p_row) - try: - ts, count = row() - p = cast(p_param, POINTER(Counter)) - p.contents.count += count - print("[%s] inserted %d in 5s, total count: %d" % (ts.strftime("%Y-%m-%d %H:%M:%S"), count, p.contents.count)) - - except Exception as err: - print(err) - raise err - - -class Counter(ctypes.Structure): - _fields_ = [ - ("count", c_int), - ] - - def __str__(self): - return "%d" % self.count - - -def test_stream(conn): - # type: (TaosConnection) -> None - dbname = "pytest_taos_stream" - try: - conn.exec("drop database if exists %s" % dbname) - conn.exec("create database if not exists %s" % dbname) - conn.select_db(dbname) - conn.exec("create table if not exists log(ts timestamp, n int)") - - result = conn.query("select count(*) from log interval(5s)") - assert result.field_count == 2 - counter = Counter() - counter.count = 0 - stream = conn.stream("select count(*) from log interval(5s)", stream_callback, param=byref(counter)) - - for _ in range(0, 20): - conn.exec("insert into log values(now,0)(now+1s, 1)(now + 2s, 2)") - time.sleep(2) - stream.close() - conn.exec("drop database if exists %s" % dbname) - conn.close() - except Exception as err: - conn.exec("drop database if exists %s" % dbname) - conn.close() - raise err - - -if __name__ == "__main__": - test_stream(connect()) -``` - -### Insert with line protocol - -```python -import taos - -conn = taos.connect() -dbname = "pytest_line" -conn.exec("drop database if exists %s" % dbname) -conn.exec("create database if not exists %s precision 'us'" % dbname) -conn.select_db(dbname) - -lines = [ - 'st,t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"passit",c2=false,c4=4f64 1626006833639000000', - 'st,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000', - 'stf,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000', -] -conn.schemaless_insert(lines, 0, "ns") -print("inserted") - -lines = [ - 'stf,t1=5i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000', -] -conn.schemaless_insert(lines, 0, "ns") - -result = conn.query("show tables") -for row in result: - print(row) -result.close() - - -conn.exec("drop database if exists %s" % dbname) -conn.close() - -``` - -## License - AGPL-3.0 - -Keep same with [TDengine](https://github.com/taosdata/TDengine). -""" -from .connection import TaosConnection - -# For some reason, the following is needed for VS Code (through PyLance) to -# recognize that "error" is a valid module of the "taos" package. -from .error import * -from .bind import * -from .field import * -from .cursor import * -from .result import * -from .statement import * -from .subscription import * -from .schemaless import * - -from taos._version import __version__ - -# Globals -threadsafety = 0 -paramstyle = "pyformat" - -__all__ = [ - "__version__", - # functions - "connect", - "new_bind_param", - "new_bind_params", - "new_multi_binds", - "new_multi_bind", - # objects - "TaosBind", - "TaosConnection", - "TaosCursor", - "TaosResult", - "TaosRows", - "TaosRow", - "TaosStmt", - "PrecisionEnum", - "SmlPrecision", - "SmlProtocol" -] - -def connect(*args, **kwargs): - # type: (..., ...) -> TaosConnection - """Function to return a TDengine connector object - - Current supporting keyword parameters: - @dsn: Data source name as string - @user: Username as string(optional) - @password: Password as string(optional) - @host: Hostname(optional) - @database: Database name(optional) - - @rtype: TDengineConnector - """ - return TaosConnection(*args, **kwargs) diff --git a/src/connector/python/taos/_version.py b/src/connector/python/taos/_version.py deleted file mode 100644 index f811561263c557cf534e90ff763373bccacb20b6..0000000000000000000000000000000000000000 --- a/src/connector/python/taos/_version.py +++ /dev/null @@ -1 +0,0 @@ -__version__ = '2.1.2' diff --git a/src/connector/python/taos/bind.py b/src/connector/python/taos/bind.py deleted file mode 100644 index 8f39278c960c285f4a8c0bfc1d8b198bb4a56f4c..0000000000000000000000000000000000000000 --- a/src/connector/python/taos/bind.py +++ /dev/null @@ -1,457 +0,0 @@ -# encoding:UTF-8 -import ctypes -from .constants import FieldType -from .error import * -from .precision import * -from datetime import datetime -from ctypes import * -import sys - -_datetime_epoch = datetime.utcfromtimestamp(0) - -def _is_not_none(obj): - return obj != None - -class TaosBind(ctypes.Structure): - _fields_ = [ - ("buffer_type", c_int), - ("buffer", c_void_p), - ("buffer_length", c_size_t), - ("length", POINTER(c_size_t)), - ("is_null", POINTER(c_int)), - ("is_unsigned", c_int), - ("error", POINTER(c_int)), - ("u", c_int64), - ("allocated", c_int), - ] - - def null(self): - self.buffer_type = FieldType.C_NULL - self.is_null = pointer(c_int(1)) - - def bool(self, value): - self.buffer_type = FieldType.C_BOOL - self.buffer = cast(pointer(c_bool(value)), c_void_p) - self.buffer_length = sizeof(c_bool) - - def tinyint(self, value): - self.buffer_type = FieldType.C_TINYINT - self.buffer = cast(pointer(c_int8(value)), c_void_p) - self.buffer_length = sizeof(c_int8) - - def smallint(self, value): - self.buffer_type = FieldType.C_SMALLINT - self.buffer = cast(pointer(c_int16(value)), c_void_p) - self.buffer_length = sizeof(c_int16) - - def int(self, value): - self.buffer_type = FieldType.C_INT - self.buffer = cast(pointer(c_int32(value)), c_void_p) - self.buffer_length = sizeof(c_int32) - - def bigint(self, value): - self.buffer_type = FieldType.C_BIGINT - self.buffer = cast(pointer(c_int64(value)), c_void_p) - self.buffer_length = sizeof(c_int64) - - def float(self, value): - self.buffer_type = FieldType.C_FLOAT - self.buffer = cast(pointer(c_float(value)), c_void_p) - self.buffer_length = sizeof(c_float) - - def double(self, value): - self.buffer_type = FieldType.C_DOUBLE - self.buffer = cast(pointer(c_double(value)), c_void_p) - self.buffer_length = sizeof(c_double) - - def binary(self, value): - buffer = None - length = 0 - if isinstance(value, str): - bytes = value.encode("utf-8") - buffer = create_string_buffer(bytes) - length = len(bytes) - else: - buffer = value - length = len(value) - self.buffer_type = FieldType.C_BINARY - self.buffer = cast(buffer, c_void_p) - self.buffer_length = length - self.length = pointer(c_size_t(self.buffer_length)) - - def timestamp(self, value, precision=PrecisionEnum.Milliseconds): - if type(value) is datetime: - if precision == PrecisionEnum.Milliseconds: - ts = int(round((value - _datetime_epoch).total_seconds() * 1000)) - elif precision == PrecisionEnum.Microseconds: - ts = int(round((value - _datetime_epoch).total_seconds() * 10000000)) - else: - raise PrecisionError("datetime do not support nanosecond precision") - elif type(value) is float: - if precision == PrecisionEnum.Milliseconds: - ts = int(round(value * 1000)) - elif precision == PrecisionEnum.Microseconds: - ts = int(round(value * 10000000)) - else: - raise PrecisionError("time float do not support nanosecond precision") - elif isinstance(value, int) and not isinstance(value, bool): - ts = value - elif isinstance(value, str): - value = datetime.fromisoformat(value) - if precision == PrecisionEnum.Milliseconds: - ts = int(round(value * 1000)) - elif precision == PrecisionEnum.Microseconds: - ts = int(round(value * 10000000)) - else: - raise PrecisionError("datetime do not support nanosecond precision") - - self.buffer_type = FieldType.C_TIMESTAMP - self.buffer = cast(pointer(c_int64(ts)), c_void_p) - self.buffer_length = sizeof(c_int64) - - def nchar(self, value): - buffer = None - length = 0 - if isinstance(value, str): - bytes = value.encode("utf-8") - buffer = create_string_buffer(bytes) - length = len(bytes) - else: - buffer = value - length = len(value) - self.buffer_type = FieldType.C_NCHAR - self.buffer = cast(buffer, c_void_p) - self.buffer_length = length - self.length = pointer(c_size_t(self.buffer_length)) - - def json(self, value): - buffer = None - length = 0 - if isinstance(value, str): - bytes = value.encode("utf-8") - buffer = create_string_buffer(bytes) - length = len(bytes) - else: - buffer = value - length = len(value) - self.buffer_type = FieldType.C_JSON - self.buffer = cast(buffer, c_void_p) - self.buffer_length = length - self.length = pointer(c_size_t(self.buffer_length)) - - def tinyint_unsigned(self, value): - self.buffer_type = FieldType.C_TINYINT_UNSIGNED - self.buffer = cast(pointer(c_uint8(value)), c_void_p) - self.buffer_length = sizeof(c_uint8) - - def smallint_unsigned(self, value): - self.buffer_type = FieldType.C_SMALLINT_UNSIGNED - self.buffer = cast(pointer(c_uint16(value)), c_void_p) - self.buffer_length = sizeof(c_uint16) - - def int_unsigned(self, value): - self.buffer_type = FieldType.C_INT_UNSIGNED - self.buffer = cast(pointer(c_uint32(value)), c_void_p) - self.buffer_length = sizeof(c_uint32) - - def bigint_unsigned(self, value): - self.buffer_type = FieldType.C_BIGINT_UNSIGNED - self.buffer = cast(pointer(c_uint64(value)), c_void_p) - self.buffer_length = sizeof(c_uint64) - - -def _datetime_to_timestamp(value, precision): - # type: (datetime | float | int | str | c_int64, PrecisionEnum) -> c_int64 - if value is None: - return FieldType.C_BIGINT_NULL - if type(value) is datetime: - if precision == PrecisionEnum.Milliseconds: - return int(round((value - _datetime_epoch).total_seconds() * 1000)) - elif precision == PrecisionEnum.Microseconds: - return int(round((value - _datetime_epoch).total_seconds() * 10000000)) - else: - raise PrecisionError("datetime do not support nanosecond precision") - elif type(value) is float: - if precision == PrecisionEnum.Milliseconds: - return int(round(value * 1000)) - elif precision == PrecisionEnum.Microseconds: - return int(round(value * 10000000)) - else: - raise PrecisionError("time float do not support nanosecond precision") - elif isinstance(value, int) and not isinstance(value, bool): - return c_int64(value) - elif isinstance(value, str): - value = datetime.fromisoformat(value) - if precision == PrecisionEnum.Milliseconds: - return int(round(value * 1000)) - elif precision == PrecisionEnum.Microseconds: - return int(round(value * 10000000)) - else: - raise PrecisionError("datetime do not support nanosecond precision") - elif isinstance(value, c_int64): - return value - return FieldType.C_BIGINT_NULL - - -class TaosMultiBind(ctypes.Structure): - _fields_ = [ - ("buffer_type", c_int), - ("buffer", c_void_p), - ("buffer_length", c_size_t), - ("length", POINTER(c_int32)), - ("is_null", c_char_p), - ("num", c_int), - ] - - def null(self, num): - self.buffer_type = FieldType.C_NULL - self.is_null = cast((c_char * num)(*[1 for _ in range(num)]), c_char_p) - self.buffer = c_void_p(None) - self.num = num - - def bool(self, values): - try: - buffer = cast(values, c_void_p) - except: - buffer_type = c_int8 * len(values) - try: - buffer = buffer_type(*values) - except: - buffer = buffer_type(*[v if v is not None else FieldType.C_BOOL_NULL for v in values]) - - self.buffer = cast(buffer, c_void_p) - self.num = len(values) - self.buffer_type = FieldType.C_BOOL - self.buffer_length = sizeof(c_bool) - - def tinyint(self, values): - self.buffer_type = FieldType.C_TINYINT - self.buffer_length = sizeof(c_int8) - - try: - buffer = cast(values, c_void_p) - except: - buffer_type = c_int8 * len(values) - try: - buffer = buffer_type(*values) - except: - buffer = buffer_type(*[v if v is not None else FieldType.C_TINYINT_NULL for v in values]) - - self.buffer = cast(buffer, c_void_p) - self.num = len(values) - - def smallint(self, values): - self.buffer_type = FieldType.C_SMALLINT - self.buffer_length = sizeof(c_int16) - - try: - buffer = cast(values, c_void_p) - except: - buffer_type = c_int16 * len(values) - try: - buffer = buffer_type(*values) - except: - buffer = buffer_type(*[v if v is not None else FieldType.C_SMALLINT_NULL for v in values]) - self.buffer = cast(buffer, c_void_p) - self.num = len(values) - - def int(self, values): - self.buffer_type = FieldType.C_INT - self.buffer_length = sizeof(c_int32) - - try: - buffer = cast(values, c_void_p) - except: - buffer_type = c_int32 * len(values) - try: - buffer = buffer_type(*values) - except: - buffer = buffer_type(*[v if v is not None else FieldType.C_INT_NULL for v in values]) - self.buffer = cast(buffer, c_void_p) - self.num = len(values) - - def bigint(self, values): - self.buffer_type = FieldType.C_BIGINT - self.buffer_length = sizeof(c_int64) - - try: - buffer = cast(values, c_void_p) - except: - buffer_type = c_int64 * len(values) - try: - buffer = buffer_type(*values) - except: - buffer = buffer_type(*[v if v is not None else FieldType.C_BIGINT_NULL for v in values]) - self.buffer = cast(buffer, c_void_p) - self.num = len(values) - - def float(self, values): - self.buffer_type = FieldType.C_FLOAT - self.buffer_length = sizeof(c_float) - - try: - buffer = cast(values, c_void_p) - except: - buffer_type = c_float * len(values) - try: - buffer = buffer_type(*values) - except: - buffer = buffer_type(*[v if v is not None else FieldType.C_FLOAT_NULL for v in values]) - self.buffer = cast(buffer, c_void_p) - self.num = len(values) - - def double(self, values): - self.buffer_type = FieldType.C_DOUBLE - self.buffer_length = sizeof(c_double) - - try: - buffer = cast(values, c_void_p) - except: - buffer_type = c_double * len(values) - try: - buffer = buffer_type(*values) - except: - buffer = buffer_type(*[v if v is not None else FieldType.C_DOUBLE_NULL for v in values]) - self.buffer = cast(buffer, c_void_p) - self.num = len(values) - - def _str_to_buffer(self, values): - self.num = len(values) - is_null = [1 if v is None else 0 for v in values] - self.is_null = cast((c_byte * self.num)(*is_null), c_char_p) - - if sum(is_null) == self.num: - self.length = (c_int32 * len(values))(0 * self.num) - return - if sys.version_info < (3, 0): - _bytes = [bytes(value) if value is not None else None for value in values] - buffer_length = max(len(b) + 1 for b in _bytes if b is not None) - buffers = [ - create_string_buffer(b, buffer_length) if b is not None else create_string_buffer(buffer_length) - for b in _bytes - ] - buffer_all = b''.join(v[:] for v in buffers) - self.buffer = cast(c_char_p(buffer_all), c_void_p) - else: - _bytes = [value.encode("utf-8") if value is not None else None for value in values] - buffer_length = max(len(b) for b in _bytes if b is not None) - self.buffer = cast( - c_char_p( - b"".join( - [ - create_string_buffer(b, buffer_length) - if b is not None - else create_string_buffer(buffer_length) - for b in _bytes - ] - ) - ), - c_void_p, - ) - self.length = (c_int32 * len(values))(*[len(b) if b is not None else 0 for b in _bytes]) - self.buffer_length = buffer_length - def binary(self, values): - self.buffer_type = FieldType.C_BINARY - self._str_to_buffer(values) - - def timestamp(self, values, precision=PrecisionEnum.Milliseconds): - try: - buffer = cast(values, c_void_p) - except: - buffer_type = c_int64 * len(values) - buffer = buffer_type(*[_datetime_to_timestamp(value, precision) for value in values]) - - self.buffer_type = FieldType.C_TIMESTAMP - self.buffer = cast(buffer, c_void_p) - self.buffer_length = sizeof(c_int64) - self.num = len(values) - - def nchar(self, values): - # type: (list[str]) -> None - self.buffer_type = FieldType.C_NCHAR - self._str_to_buffer(values) - - def json(self, values): - # type: (list[str]) -> None - self.buffer_type = FieldType.C_JSON - self._str_to_buffer(values) - - def tinyint_unsigned(self, values): - self.buffer_type = FieldType.C_TINYINT_UNSIGNED - self.buffer_length = sizeof(c_uint8) - - try: - buffer = cast(values, c_void_p) - except: - buffer_type = c_uint8 * len(values) - try: - buffer = buffer_type(*values) - except: - buffer = buffer_type(*[v if v is not None else FieldType.C_TINYINT_UNSIGNED_NULL for v in values]) - self.buffer = cast(buffer, c_void_p) - self.num = len(values) - - def smallint_unsigned(self, values): - self.buffer_type = FieldType.C_SMALLINT_UNSIGNED - self.buffer_length = sizeof(c_uint16) - - try: - buffer = cast(values, c_void_p) - except: - buffer_type = c_uint16 * len(values) - try: - buffer = buffer_type(*values) - except: - buffer = buffer_type(*[v if v is not None else FieldType.C_SMALLINT_UNSIGNED_NULL for v in values]) - self.buffer = cast(buffer, c_void_p) - self.num = len(values) - - def int_unsigned(self, values): - self.buffer_type = FieldType.C_INT_UNSIGNED - self.buffer_length = sizeof(c_uint32) - - try: - buffer = cast(values, c_void_p) - except: - buffer_type = c_uint32 * len(values) - try: - buffer = buffer_type(*values) - except: - buffer = buffer_type(*[v if v is not None else FieldType.C_INT_UNSIGNED_NULL for v in values]) - self.buffer = cast(buffer, c_void_p) - self.num = len(values) - - def bigint_unsigned(self, values): - self.buffer_type = FieldType.C_BIGINT_UNSIGNED - self.buffer_length = sizeof(c_uint64) - - try: - buffer = cast(values, c_void_p) - except: - buffer_type = c_uint64 * len(values) - try: - buffer = buffer_type(*values) - except: - buffer = buffer_type(*[v if v is not None else FieldType.C_BIGINT_UNSIGNED_NULL for v in values]) - self.buffer = cast(buffer, c_void_p) - self.num = len(values) - - -def new_bind_param(): - # type: () -> TaosBind - return TaosBind() - - -def new_bind_params(size): - # type: (int) -> Array[TaosBind] - return (TaosBind * size)() - - -def new_multi_bind(): - # type: () -> TaosMultiBind - return TaosMultiBind() - - -def new_multi_binds(size): - # type: (int) -> Array[TaosMultiBind] - return (TaosMultiBind * size)() diff --git a/src/connector/python/taos/cinterface.py b/src/connector/python/taos/cinterface.py deleted file mode 100644 index be39d2291a908b9349599ba13e92a205696516c7..0000000000000000000000000000000000000000 --- a/src/connector/python/taos/cinterface.py +++ /dev/null @@ -1,921 +0,0 @@ -# encoding:UTF-8 - -import ctypes -import platform -import inspect -from ctypes import * - -try: - from typing import Any -except: - pass - -from .error import * -from .bind import * -from .field import * -from .schemaless import * - -_UNSUPPORTED = {} - -# stream callback -stream_callback_type = CFUNCTYPE(None, c_void_p, c_void_p, c_void_p) -stream_callback2_type = CFUNCTYPE(None, c_void_p) - -# C interface class -class TaosOption: - Locale = (0,) - Charset = (1,) - Timezone = (2,) - ConfigDir = (3,) - ShellActivityTimer = (4,) - MaxOptions = (5,) - - -def _load_taos_linux(): - return ctypes.CDLL("libtaos.so") - - -def _load_taos_darwin(): - return ctypes.CDLL("libtaos.dylib") - - -def _load_taos_windows(): - return ctypes.windll.LoadLibrary("taos") - - -def _load_taos(): - load_func = { - "Linux": _load_taos_linux, - "Darwin": _load_taos_darwin, - "Windows": _load_taos_windows, - } - pf = platform.system() - if load_func[pf] is None: - raise InterfaceError("unsupported platform: %s" % pf) - try: - return load_func[pf]() - except Exception as err: - raise InterfaceError("unable to load taos C library: %s" % err) - - -_libtaos = _load_taos() - -_libtaos.taos_fetch_fields.restype = ctypes.POINTER(TaosField) -_libtaos.taos_init.restype = None -_libtaos.taos_connect.restype = ctypes.c_void_p -_libtaos.taos_fetch_row.restype = ctypes.POINTER(ctypes.c_void_p) -_libtaos.taos_errstr.restype = ctypes.c_char_p -_libtaos.taos_subscribe.restype = ctypes.c_void_p -_libtaos.taos_consume.restype = ctypes.c_void_p -_libtaos.taos_fetch_lengths.restype = ctypes.POINTER(ctypes.c_int) -_libtaos.taos_free_result.restype = None -_libtaos.taos_query.restype = ctypes.POINTER(ctypes.c_void_p) - -try: - _libtaos.taos_stmt_errstr.restype = c_char_p -except AttributeError: - None -finally: - None - - -_libtaos.taos_options.restype = None - - -def taos_options(option, *args): - # type: (TaosOption, Any) -> None - _libtaos.taos_options(option, *args) - - -def taos_init(): - # type: () -> None - """ - C: taos_init - """ - _libtaos.taos_init() - - -_libtaos.taos_cleanup.restype = None - - -def taos_cleanup(): - # type: () -> None - """Cleanup workspace.""" - _libtaos.taos_cleanup() - - -_libtaos.taos_get_client_info.restype = c_char_p - - -def taos_get_client_info(): - # type: () -> str - """Get client version info.""" - return _libtaos.taos_get_client_info().decode("utf-8") - - -_libtaos.taos_get_server_info.restype = c_char_p -_libtaos.taos_get_server_info.argtypes = (c_void_p,) - - -def taos_get_server_info(connection): - # type: (c_void_p) -> str - """Get server version as string.""" - return _libtaos.taos_get_server_info(connection).decode("utf-8") - - -_libtaos.taos_close.restype = None -_libtaos.taos_close.argtypes = (c_void_p,) - - -def taos_close(connection): - # type: (c_void_p) -> None - """Close the TAOS* connection""" - _libtaos.taos_close(connection) - - -_libtaos.taos_connect.restype = c_void_p -_libtaos.taos_connect.argtypes = c_char_p, c_char_p, c_char_p, c_char_p, c_uint16 - - -def taos_connect(host=None, user="root", password="taosdata", db=None, port=0): - # type: (None|str, str, str, None|str, int) -> c_void_p - """Create TDengine database connection. - - - host: server hostname/FQDN - - user: user name - - password: user password - - db: database name (optional) - - port: server port - - @rtype: c_void_p, TDengine handle - """ - # host - try: - _host = c_char_p(host.encode("utf-8")) if host is not None else None - except AttributeError: - raise AttributeError("host is expected as a str") - - # user - try: - _user = c_char_p(user.encode("utf-8")) - except AttributeError: - raise AttributeError("user is expected as a str") - - # password - try: - _password = c_char_p(password.encode("utf-8")) - except AttributeError: - raise AttributeError("password is expected as a str") - - # db - try: - _db = c_char_p(db.encode("utf-8")) if db is not None else None - except AttributeError: - raise AttributeError("db is expected as a str") - - # port - try: - _port = c_uint16(port) - except TypeError: - raise TypeError("port is expected as an uint16") - - connection = cast(_libtaos.taos_connect(_host, _user, _password, _db, _port), c_void_p) - - if connection.value is None: - raise ConnectionError("connect to TDengine failed") - return connection - - -_libtaos.taos_connect_auth.restype = c_void_p -_libtaos.taos_connect_auth.argtypes = c_char_p, c_char_p, c_char_p, c_char_p, c_uint16 - -_libtaos.taos_connect_auth.restype = c_void_p -_libtaos.taos_connect_auth.argtypes = c_char_p, c_char_p, c_char_p, c_char_p, c_uint16 - - -def taos_connect_auth(host=None, user="root", auth="", db=None, port=0): - # type: (None|str, str, str, None|str, int) -> c_void_p - """Connect server with auth token. - - - host: server hostname/FQDN - - user: user name - - auth: base64 encoded auth token - - db: database name (optional) - - port: server port - - @rtype: c_void_p, TDengine handle - """ - # host - try: - _host = c_char_p(host.encode("utf-8")) if host is not None else None - except AttributeError: - raise AttributeError("host is expected as a str") - - # user - try: - _user = c_char_p(user.encode("utf-8")) - except AttributeError: - raise AttributeError("user is expected as a str") - - # auth - try: - _auth = c_char_p(auth.encode("utf-8")) - except AttributeError: - raise AttributeError("password is expected as a str") - - # db - try: - _db = c_char_p(db.encode("utf-8")) if db is not None else None - except AttributeError: - raise AttributeError("db is expected as a str") - - # port - try: - _port = c_int(port) - except TypeError: - raise TypeError("port is expected as an int") - - connection = c_void_p(_libtaos.taos_connect_auth(_host, _user, _auth, _db, _port)) - - if connection.value is None: - raise ConnectionError("connect to TDengine failed") - return connection - - -_libtaos.taos_query.restype = c_void_p -_libtaos.taos_query.argtypes = c_void_p, c_char_p - - -def taos_query(connection, sql): - # type: (c_void_p, str) -> c_void_p - """Run SQL - - - sql: str, sql string to run - - @return: TAOS_RES*, result pointer - - """ - try: - ptr = c_char_p(sql.encode("utf-8")) - res = c_void_p(_libtaos.taos_query(connection, ptr)) - errno = taos_errno(res) - if errno != 0: - errstr = taos_errstr(res) - taos_free_result(res) - raise ProgrammingError(errstr, errno) - return res - except AttributeError: - raise AttributeError("sql is expected as a string") - - -async_query_callback_type = CFUNCTYPE(None, c_void_p, c_void_p, c_int) -_libtaos.taos_query_a.restype = None -_libtaos.taos_query_a.argtypes = c_void_p, c_char_p, async_query_callback_type, c_void_p - - -def taos_query_a(connection, sql, callback, param): - # type: (c_void_p, str, async_query_callback_type, c_void_p) -> c_void_p - _libtaos.taos_query_a(connection, c_char_p(sql.encode("utf-8")), async_query_callback_type(callback), param) - - -async_fetch_rows_callback_type = CFUNCTYPE(None, c_void_p, c_void_p, c_int) -_libtaos.taos_fetch_rows_a.restype = None -_libtaos.taos_fetch_rows_a.argtypes = c_void_p, async_fetch_rows_callback_type, c_void_p - - -def taos_fetch_rows_a(result, callback, param): - # type: (c_void_p, async_fetch_rows_callback_type, c_void_p) -> c_void_p - _libtaos.taos_fetch_rows_a(result, async_fetch_rows_callback_type(callback), param) - - -def taos_affected_rows(result): - # type: (c_void_p) -> c_int - """The affected rows after runing query""" - return _libtaos.taos_affected_rows(result) - - -subscribe_callback_type = CFUNCTYPE(None, c_void_p, c_void_p, c_void_p, c_int) -_libtaos.taos_subscribe.restype = c_void_p -# _libtaos.taos_subscribe.argtypes = c_void_p, c_int, c_char_p, c_char_p, subscribe_callback_type, c_void_p, c_int - - -def taos_subscribe(connection, restart, topic, sql, interval, callback=None, param=None): - # type: (c_void_p, bool, str, str, c_int, subscribe_callback_type, c_void_p | None) -> c_void_p - """Create a subscription - @restart boolean, - @sql string, sql statement for data query, must be a 'select' statement. - @topic string, name of this subscription - """ - if callback != None: - callback = subscribe_callback_type(callback) - return c_void_p( - _libtaos.taos_subscribe( - connection, - 1 if restart else 0, - c_char_p(topic.encode("utf-8")), - c_char_p(sql.encode("utf-8")), - callback, - c_void_p(param), - interval, - ) - ) - - -_libtaos.taos_consume.restype = c_void_p -_libtaos.taos_consume.argstype = (c_void_p,) - - -def taos_consume(sub): - """Consume data of a subscription""" - return c_void_p(_libtaos.taos_consume(sub)) - - -_libtaos.taos_unsubscribe.restype = None -_libtaos.taos_unsubscribe.argstype = c_void_p, c_int - - -def taos_unsubscribe(sub, keep_progress): - """Cancel a subscription""" - _libtaos.taos_unsubscribe(sub, 1 if keep_progress else 0) - - -def taos_use_result(result): - """Use result after calling self.query, it's just for 1.6.""" - fields = [] - pfields = taos_fetch_fields_raw(result) - for i in range(taos_field_count(result)): - fields.append( - { - "name": pfields[i].name, - "bytes": pfields[i].bytes, - "type": pfields[i].type, - } - ) - - return fields - - -_libtaos.taos_fetch_block.restype = c_int -_libtaos.taos_fetch_block.argtypes = c_void_p, c_void_p - - -def taos_fetch_block_raw(result): - pblock = ctypes.c_void_p(0) - num_of_rows = _libtaos.taos_fetch_block(result, ctypes.byref(pblock)) - if num_of_rows == 0: - return None, 0 - return pblock, abs(num_of_rows) - - -def taos_fetch_block(result, fields=None, field_count=None): - pblock = ctypes.c_void_p(0) - num_of_rows = _libtaos.taos_fetch_block(result, ctypes.byref(pblock)) - if num_of_rows == 0: - return None, 0 - precision = taos_result_precision(result) - if fields is None: - fields = taos_fetch_fields(result) - if field_count is None: - field_count = taos_field_count(result) - blocks = [None] * field_count - fieldLen = taos_fetch_lengths(result, field_count) - for i in range(len(fields)): - data = ctypes.cast(pblock, ctypes.POINTER(ctypes.c_void_p))[i] - if fields[i]["type"] not in CONVERT_FUNC: - raise DatabaseError("Invalid data type returned from database") - blocks[i] = CONVERT_FUNC_BLOCK[fields[i]["type"]](data, num_of_rows, fieldLen[i], precision) - - return blocks, abs(num_of_rows) - - -_libtaos.taos_fetch_row.restype = c_void_p -_libtaos.taos_fetch_row.argtypes = (c_void_p,) - - -def taos_fetch_row_raw(result): - # type: (c_void_p) -> c_void_p - row = c_void_p(_libtaos.taos_fetch_row(result)) - if row: - return row - return None - - -def taos_fetch_row(result, fields): - # type: (c_void_p, Array[TaosField]) -> tuple(c_void_p, int) - pblock = ctypes.c_void_p(0) - pblock = taos_fetch_row_raw(result) - if pblock: - num_of_rows = 1 - precision = taos_result_precision(result) - field_count = taos_field_count(result) - blocks = [None] * field_count - field_lens = taos_fetch_lengths(result, field_count) - for i in range(field_count): - data = ctypes.cast(pblock, ctypes.POINTER(ctypes.c_void_p))[i] - if fields[i].type not in CONVERT_FUNC: - raise DatabaseError("Invalid data type returned from database") - if data is None: - blocks[i] = [None] - else: - blocks[i] = CONVERT_FUNC[fields[i].type](data, num_of_rows, field_lens[i], precision) - else: - return None, 0 - return blocks, abs(num_of_rows) - - -_libtaos.taos_free_result.argtypes = (c_void_p,) - - -def taos_free_result(result): - # type: (c_void_p) -> None - if result != None: - _libtaos.taos_free_result(result) - - -_libtaos.taos_field_count.restype = c_int -_libtaos.taos_field_count.argstype = (c_void_p,) - - -def taos_field_count(result): - # type: (c_void_p) -> int - return _libtaos.taos_field_count(result) - - -def taos_num_fields(result): - # type: (c_void_p) -> int - return _libtaos.taos_num_fields(result) - - -_libtaos.taos_fetch_fields.restype = c_void_p -_libtaos.taos_fetch_fields.argstype = (c_void_p,) - - -def taos_fetch_fields_raw(result): - # type: (c_void_p) -> c_void_p - return c_void_p(_libtaos.taos_fetch_fields(result)) - - -def taos_fetch_fields(result): - # type: (c_void_p) -> TaosFields - fields = taos_fetch_fields_raw(result) - count = taos_field_count(result) - return TaosFields(fields, count) - - -def taos_fetch_lengths(result, field_count=None): - # type: (c_void_p, int) -> Array[int] - """Make sure to call taos_fetch_row or taos_fetch_block before fetch_lengths""" - lens = _libtaos.taos_fetch_lengths(result) - if field_count is None: - field_count = taos_field_count(result) - if not lens: - raise OperationalError("field length empty, use taos_fetch_row/block before it") - return lens[:field_count] - - -def taos_result_precision(result): - # type: (c_void_p) -> c_int - return _libtaos.taos_result_precision(result) - - -_libtaos.taos_errno.restype = c_int -_libtaos.taos_errno.argstype = (c_void_p,) - - -def taos_errno(result): - # type: (ctypes.c_void_p) -> c_int - """Return the error number.""" - return _libtaos.taos_errno(result) - - -_libtaos.taos_errstr.restype = c_char_p -_libtaos.taos_errstr.argstype = (c_void_p,) - - -def taos_errstr(result=c_void_p(None)): - # type: (ctypes.c_void_p) -> str - """Return the error styring""" - return _libtaos.taos_errstr(result).decode("utf-8") - - -_libtaos.taos_stop_query.restype = None -_libtaos.taos_stop_query.argstype = (c_void_p,) - - -def taos_stop_query(result): - # type: (ctypes.c_void_p) -> None - """Stop current query""" - return _libtaos.taos_stop_query(result) - - -try: - _libtaos.taos_load_table_info.restype = c_int - _libtaos.taos_load_table_info.argstype = (c_void_p, c_char_p) -except Exception as err: - _UNSUPPORTED["taos_open_stream"] = err - - -def taos_load_table_info(connection, tables): - # type: (ctypes.c_void_p, str) -> None - """Stop current query""" - _check_if_supported() - errno = _libtaos.taos_load_table_info(connection, c_char_p(tables.encode("utf-8"))) - if errno != 0: - msg = taos_errstr() - raise OperationalError(msg, errno) - - -_libtaos.taos_validate_sql.restype = c_int -_libtaos.taos_validate_sql.argstype = (c_void_p, c_char_p) - - -def taos_validate_sql(connection, sql): - # type: (ctypes.c_void_p, str) -> None | str - """Get taosd server info""" - errno = _libtaos.taos_validate_sql(connection, ctypes.c_char_p(sql.encode("utf-8"))) - if errno != 0: - msg = taos_errstr() - return msg - return None - - -_libtaos.taos_print_row.restype = c_int -_libtaos.taos_print_row.argstype = (c_char_p, c_void_p, c_void_p, c_int) - - -def taos_print_row(row, fields, num_fields, buffer_size=4096): - # type: (ctypes.c_void_p, ctypes.c_void_p | TaosFields, int, int) -> str - """Print an row to string""" - p = ctypes.create_string_buffer(buffer_size) - if isinstance(fields, TaosFields): - _libtaos.taos_print_row(p, row, fields.as_ptr(), num_fields) - else: - _libtaos.taos_print_row(p, row, fields, num_fields) - if p: - return p.value.decode("utf-8") - raise OperationalError("taos_print_row failed") - - -_libtaos.taos_select_db.restype = c_int -_libtaos.taos_select_db.argstype = (c_void_p, c_char_p) - - -def taos_select_db(connection, db): - # type: (ctypes.c_void_p, str) -> None - """Select database, eq to sql: use """ - res = _libtaos.taos_select_db(connection, ctypes.c_char_p(db.encode("utf-8"))) - if res != 0: - raise DatabaseError("select database error", res) - - -try: - _libtaos.taos_open_stream.restype = c_void_p - _libtaos.taos_open_stream.argstype = c_void_p, c_char_p, stream_callback_type, c_int64, c_void_p, Any -except Exception as err: - _UNSUPPORTED["taos_open_stream"] = err - - -def taos_open_stream(connection, sql, callback, stime=0, param=None, callback2=None): - # type: (ctypes.c_void_p, str, stream_callback_type, c_int64, c_void_p, c_void_p) -> ctypes.pointer - _check_if_supported() - if callback2 != None: - callback2 = stream_callback2_type(callback2) - """Open an stream""" - return c_void_p( - _libtaos.taos_open_stream( - connection, ctypes.c_char_p(sql.encode("utf-8")), stream_callback_type(callback), stime, param, callback2 - ) - ) - - -_libtaos.taos_close_stream.restype = None -_libtaos.taos_close_stream.argstype = (c_void_p,) - - -def taos_close_stream(stream): - # type: (c_void_p) -> None - """Open an stream""" - return _libtaos.taos_close_stream(stream) - - -_libtaos.taos_stmt_init.restype = c_void_p -_libtaos.taos_stmt_init.argstype = (c_void_p,) - - -def taos_stmt_init(connection): - # type: (c_void_p) -> (c_void_p) - """Create a statement query - @param(connection): c_void_p TAOS* - @rtype: c_void_p, *TAOS_STMT - """ - return c_void_p(_libtaos.taos_stmt_init(connection)) - - -_libtaos.taos_stmt_prepare.restype = c_int -_libtaos.taos_stmt_prepare.argstype = (c_void_p, c_char_p, c_int) - - -def taos_stmt_prepare(stmt, sql): - # type: (ctypes.c_void_p, str) -> None - """Prepare a statement query - @stmt: c_void_p TAOS_STMT* - """ - buffer = sql.encode("utf-8") - res = _libtaos.taos_stmt_prepare(stmt, ctypes.c_char_p(buffer), len(buffer)) - if res != 0: - raise StatementError(msg=taos_stmt_errstr(stmt), errno=res) - - -_libtaos.taos_stmt_close.restype = c_int -_libtaos.taos_stmt_close.argstype = (c_void_p,) - - -def taos_stmt_close(stmt): - # type: (ctypes.c_void_p) -> None - """Close a statement query - @stmt: c_void_p TAOS_STMT* - """ - res = _libtaos.taos_stmt_close(stmt) - if res != 0: - raise StatementError(msg=taos_stmt_errstr(stmt), errno=res) - - -try: - _libtaos.taos_stmt_errstr.restype = c_char_p - _libtaos.taos_stmt_errstr.argstype = (c_void_p,) -except Exception as err: - _UNSUPPORTED["taos_stmt_set_tbname"] = err - - -def taos_stmt_errstr(stmt): - # type: (ctypes.c_void_p) -> str - """Get error message from stetement query - @stmt: c_void_p TAOS_STMT* - """ - _check_if_supported() - err = c_char_p(_libtaos.taos_stmt_errstr(stmt)) - if err: - return err.value.decode("utf-8") - - -try: - _libtaos.taos_stmt_set_tbname.restype = c_int - _libtaos.taos_stmt_set_tbname.argstype = (c_void_p, c_char_p) -except Exception as err: - _UNSUPPORTED["taos_stmt_set_tbname"] = err - - -def taos_stmt_set_tbname(stmt, name): - # type: (ctypes.c_void_p, str) -> None - """Set table name of a statement query if exists. - @stmt: c_void_p TAOS_STMT* - """ - _check_if_supported() - res = _libtaos.taos_stmt_set_tbname(stmt, c_char_p(name.encode("utf-8"))) - if res != 0: - raise StatementError(msg=taos_stmt_errstr(stmt), errno=res) - - -try: - _libtaos.taos_stmt_set_tbname_tags.restype = c_int - _libtaos.taos_stmt_set_tbname_tags.argstype = (c_void_p, c_char_p, c_void_p) -except Exception as err: - _UNSUPPORTED["taos_stmt_set_tbname_tags"] = err - - -def taos_stmt_set_tbname_tags(stmt, name, tags): - # type: (c_void_p, str, c_void_p) -> None - """Set table name with tags bind params. - @stmt: c_void_p TAOS_STMT* - """ - _check_if_supported() - res = _libtaos.taos_stmt_set_tbname_tags(stmt, ctypes.c_char_p(name.encode("utf-8")), tags) - - if res != 0: - raise StatementError(msg=taos_stmt_errstr(stmt), errno=res) - - -_libtaos.taos_stmt_is_insert.restype = c_int -_libtaos.taos_stmt_is_insert.argstype = (c_void_p, POINTER(c_int)) - - -def taos_stmt_is_insert(stmt): - # type: (ctypes.c_void_p) -> bool - """Set table name with tags bind params. - @stmt: c_void_p TAOS_STMT* - """ - is_insert = ctypes.c_int() - res = _libtaos.taos_stmt_is_insert(stmt, ctypes.byref(is_insert)) - if res != 0: - raise StatementError(msg=taos_stmt_errstr(stmt), errno=res) - return is_insert == 0 - - -_libtaos.taos_stmt_num_params.restype = c_int -_libtaos.taos_stmt_num_params.argstype = (c_void_p, POINTER(c_int)) - - -def taos_stmt_num_params(stmt): - # type: (ctypes.c_void_p) -> int - """Params number of the current statement query. - @stmt: TAOS_STMT* - """ - num_params = ctypes.c_int() - res = _libtaos.taos_stmt_num_params(stmt, ctypes.byref(num_params)) - if res != 0: - raise StatementError(msg=taos_stmt_errstr(stmt), errno=res) - return num_params.value - - -_libtaos.taos_stmt_bind_param.restype = c_int -_libtaos.taos_stmt_bind_param.argstype = (c_void_p, c_void_p) - - -def taos_stmt_bind_param(stmt, bind): - # type: (ctypes.c_void_p, Array[TaosBind]) -> None - """Bind params in the statement query. - @stmt: TAOS_STMT* - @bind: TAOS_BIND* - """ - # ptr = ctypes.cast(bind, POINTER(TaosBind)) - # ptr = pointer(bind) - res = _libtaos.taos_stmt_bind_param(stmt, bind) - if res != 0: - raise StatementError(msg=taos_stmt_errstr(stmt), errno=res) - - -try: - _libtaos.taos_stmt_bind_param_batch.restype = c_int - _libtaos.taos_stmt_bind_param_batch.argstype = (c_void_p, c_void_p) -except Exception as err: - _UNSUPPORTED["taos_stmt_bind_param_batch"] = err - - -def taos_stmt_bind_param_batch(stmt, bind): - # type: (ctypes.c_void_p, Array[TaosMultiBind]) -> None - """Bind params in the statement query. - @stmt: TAOS_STMT* - @bind: TAOS_BIND* - """ - # ptr = ctypes.cast(bind, POINTER(TaosMultiBind)) - # ptr = pointer(bind) - _check_if_supported() - res = _libtaos.taos_stmt_bind_param_batch(stmt, bind) - if res != 0: - raise StatementError(msg=taos_stmt_errstr(stmt), errno=res) - - -try: - _libtaos.taos_stmt_bind_single_param_batch.restype = c_int - _libtaos.taos_stmt_bind_single_param_batch.argstype = (c_void_p, c_void_p, c_int) -except Exception as err: - _UNSUPPORTED["taos_stmt_bind_single_param_batch"] = err - - -def taos_stmt_bind_single_param_batch(stmt, bind, col): - # type: (ctypes.c_void_p, Array[TaosMultiBind], c_int) -> None - """Bind params in the statement query. - @stmt: TAOS_STMT* - @bind: TAOS_MULTI_BIND* - @col: column index - """ - _check_if_supported() - res = _libtaos.taos_stmt_bind_single_param_batch(stmt, bind, col) - if res != 0: - raise StatementError(msg=taos_stmt_errstr(stmt), errno=res) - - -_libtaos.taos_stmt_add_batch.restype = c_int -_libtaos.taos_stmt_add_batch.argstype = (c_void_p,) - - -def taos_stmt_add_batch(stmt): - # type: (ctypes.c_void_p) -> None - """Add current params into batch - @stmt: TAOS_STMT* - """ - res = _libtaos.taos_stmt_add_batch(stmt) - if res != 0: - raise StatementError(msg=taos_stmt_errstr(stmt), errno=res) - - -_libtaos.taos_stmt_execute.restype = c_int -_libtaos.taos_stmt_execute.argstype = (c_void_p,) - - -def taos_stmt_execute(stmt): - # type: (ctypes.c_void_p) -> None - """Execute a statement query - @stmt: TAOS_STMT* - """ - res = _libtaos.taos_stmt_execute(stmt) - if res != 0: - raise StatementError(msg=taos_stmt_errstr(stmt), errno=res) - - -_libtaos.taos_stmt_use_result.restype = c_void_p -_libtaos.taos_stmt_use_result.argstype = (c_void_p,) - - -def taos_stmt_use_result(stmt): - # type: (ctypes.c_void_p) -> None - """Get result of the statement. - @stmt: TAOS_STMT* - """ - result = c_void_p(_libtaos.taos_stmt_use_result(stmt)) - if result is None: - raise StatementError(taos_stmt_errstr(stmt)) - return result - - -try: - _libtaos.taos_schemaless_insert.restype = c_void_p - _libtaos.taos_schemaless_insert.argstype = c_void_p, c_void_p, c_int, c_int, c_int -except Exception as err: - _UNSUPPORTED["taos_schemaless_insert"] = err - - -def taos_schemaless_insert(connection, lines, protocol, precision): - # type: (c_void_p, list[str] | tuple(str), SmlProtocol, SmlPrecision) -> int - _check_if_supported() - num_of_lines = len(lines) - lines = (c_char_p(line.encode("utf-8")) for line in lines) - lines_type = ctypes.c_char_p * num_of_lines - p_lines = lines_type(*lines) - res = c_void_p(_libtaos.taos_schemaless_insert(connection, p_lines, num_of_lines, protocol, precision)) - errno = taos_errno(res) - affected_rows = taos_affected_rows(res) - if errno != 0: - errstr = taos_errstr(res) - taos_free_result(res) - raise SchemalessError(errstr, errno, affected_rows) - - taos_free_result(res) - return affected_rows - - -def _check_if_supported(): - func = inspect.stack()[1][3] - if func in _UNSUPPORTED: - raise InterfaceError("C function %s is not supported in v%s: %s" % (func, taos_get_client_info(), _UNSUPPORTED[func])) - - -def unsupported_methods(): - for m, e in range(_UNSUPPORTED): - print("unsupported %s: %s", m, e) - - -class CTaosInterface(object): - def __init__(self, config=None): - """ - Function to initialize the class - @host : str, hostname to connect - @user : str, username to connect to server - @password : str, password to connect to server - @db : str, default db to use when log in - @config : str, config directory - - @rtype : None - """ - if config is None: - self._config = ctypes.c_char_p(None) - else: - try: - self._config = ctypes.c_char_p(config.encode("utf-8")) - except AttributeError: - raise AttributeError("config is expected as a str") - - if config is not None: - taos_options(3, self._config) - - taos_init() - - @property - def config(self): - """Get current config""" - return self._config - - def connect(self, host=None, user="root", password="taosdata", db=None, port=0): - """ - Function to connect to server - - @rtype: c_void_p, TDengine handle - """ - return taos_connect(host, user, password, db, port) - - -if __name__ == "__main__": - cinter = CTaosInterface() - conn = cinter.connect() - result = cinter.query(conn, "show databases") - - print("Query Affected rows: {}".format(cinter.affected_rows(result))) - - fields = taos_fetch_fields_raw(result) - - data, num_of_rows = taos_fetch_block(result, fields) - - print(data) - - cinter.free_result(result) - cinter.close(conn) diff --git a/src/connector/python/taos/connection.py b/src/connector/python/taos/connection.py deleted file mode 100644 index dc8225ab33c84930214eb8f0d8ba47f6f31a5adf..0000000000000000000000000000000000000000 --- a/src/connector/python/taos/connection.py +++ /dev/null @@ -1,204 +0,0 @@ -# encoding:UTF-8 -from types import FunctionType -from .cinterface import * -from .cursor import TaosCursor -from .subscription import TaosSubscription -from .statement import TaosStmt -from .stream import TaosStream -from .result import * - - -class TaosConnection(object): - """TDengine connection object""" - - def __init__(self, *args, **kwargs): - self._conn = None - self._host = None - self._user = "root" - self._password = "taosdata" - self._database = None - self._port = 0 - self._config = None - self._chandle = None - - self.config(**kwargs) - - def config(self, **kwargs): - # host - if "host" in kwargs: - self._host = kwargs["host"] - - # user - if "user" in kwargs: - self._user = kwargs["user"] - - # password - if "password" in kwargs: - self._password = kwargs["password"] - - # database - if "database" in kwargs: - self._database = kwargs["database"] - - # port - if "port" in kwargs: - self._port = kwargs["port"] - - # config - if "config" in kwargs: - self._config = kwargs["config"] - - self._chandle = CTaosInterface(self._config) - self._conn = self._chandle.connect(self._host, self._user, self._password, self._database, self._port) - - def close(self): - """Close current connection.""" - if self._conn: - taos_close(self._conn) - self._conn = None - - @property - def client_info(self): - # type: () -> str - return taos_get_client_info() - - @property - def server_info(self): - # type: () -> str - return taos_get_server_info(self._conn) - - def select_db(self, database): - # type: (str) -> None - taos_select_db(self._conn, database) - - def execute(self, sql): - # type: (str) -> int - """Simplely execute sql ignoring the results""" - return self.query(sql).affected_rows - - def query(self, sql): - # type: (str) -> TaosResult - result = taos_query(self._conn, sql) - return TaosResult(result, True, self) - - def query_a(self, sql, callback, param): - # type: (str, async_query_callback_type, c_void_p) -> None - """Asynchronously query a sql with callback function""" - taos_query_a(self._conn, sql, callback, param) - - def subscribe(self, restart, topic, sql, interval, callback=None, param=None): - # type: (bool, str, str, int, subscribe_callback_type, c_void_p) -> TaosSubscription - """Create a subscription.""" - if self._conn is None: - return None - sub = taos_subscribe(self._conn, restart, topic, sql, interval, callback, param) - return TaosSubscription(sub, callback != None) - - def statement(self, sql=None): - # type: (str | None) -> TaosStmt - if self._conn is None: - return None - stmt = taos_stmt_init(self._conn) - if sql != None: - taos_stmt_prepare(stmt, sql) - - return TaosStmt(stmt) - - def load_table_info(self, tables): - # type: (str) -> None - taos_load_table_info(self._conn, tables) - - def stream(self, sql, callback, stime=0, param=None, callback2=None): - # type: (str, Callable[[Any, TaosResult, TaosRows], None], int, Any, c_void_p) -> TaosStream - # cb = cast(callback, stream_callback_type) - # ref = byref(cb) - - stream = taos_open_stream(self._conn, sql, callback, stime, param, callback2) - return TaosStream(stream) - - def schemaless_insert(self, lines, protocol, precision): - # type: (list[str], SmlProtocol, SmlPrecision) -> int - """ - 1.Line protocol and schemaless support - - ## Example - - ```python - import taos - conn = taos.connect() - conn.exec("drop database if exists test") - conn.select_db("test") - lines = [ - 'ste,t2=5,t3=L"ste" c1=true,c2=4,c3="string" 1626056811855516532', - ] - conn.schemaless_insert(lines, 0, "ns") - ``` - - 2.OpenTSDB telnet style API format support - - ## Example - import taos - conn = taos.connect() - conn.exec("drop database if exists test") - conn.select_db("test") - lines = [ - 'cpu_load 1626056811855516532ns 2.0f32 id="tb1",host="host0",interface="eth0"', - ] - conn.schemaless_insert(lines, 1, None) - - - 3.OpenTSDB HTTP JSON format support - - ## Example - import taos - conn = taos.connect() - conn.exec("drop database if exists test") - conn.select_db("test") - payload = [''' - { - "metric": "cpu_load_0", - "timestamp": 1626006833610123, - "value": 55.5, - "tags": - { - "host": "ubuntu", - "interface": "eth0", - "Id": "tb0" - } - } - '''] - conn.schemaless_insert(lines, 2, None) - - """ - print(lines, protocol, precision) - return taos_schemaless_insert(self._conn, lines, protocol, precision) - - - def cursor(self): - # type: () -> TaosCursor - """Return a new Cursor object using the connection.""" - return TaosCursor(self) - - def commit(self): - """Commit any pending transaction to the database. - - Since TDengine do not support transactions, the implement is void functionality. - """ - pass - - def rollback(self): - """Void functionality""" - pass - - def clear_result_set(self): - """Clear unused result set on this connection.""" - pass - - def __del__(self): - self.close() - - -if __name__ == "__main__": - conn = TaosConnection() - conn.close() - print("Hello world") diff --git a/src/connector/python/taos/constants.py b/src/connector/python/taos/constants.py deleted file mode 100644 index 34044a15fc0cd73323552f1b4b8c280d6cad5a9b..0000000000000000000000000000000000000000 --- a/src/connector/python/taos/constants.py +++ /dev/null @@ -1,47 +0,0 @@ -# encoding:UTF-8 - -"""Constants in TDengine python -""" - -import ctypes, struct - - -class FieldType(object): - """TDengine Field Types""" - - # type_code - C_NULL = 0 - C_BOOL = 1 - C_TINYINT = 2 - C_SMALLINT = 3 - C_INT = 4 - C_BIGINT = 5 - C_FLOAT = 6 - C_DOUBLE = 7 - C_BINARY = 8 - C_TIMESTAMP = 9 - C_NCHAR = 10 - C_TINYINT_UNSIGNED = 11 - C_SMALLINT_UNSIGNED = 12 - C_INT_UNSIGNED = 13 - C_BIGINT_UNSIGNED = 14 - C_JSON = 15 - # NULL value definition - # NOTE: These values should change according to C definition in tsdb.h - C_BOOL_NULL = 0x02 - C_TINYINT_NULL = -128 - C_TINYINT_UNSIGNED_NULL = 255 - C_SMALLINT_NULL = -32768 - C_SMALLINT_UNSIGNED_NULL = 65535 - C_INT_NULL = -2147483648 - C_INT_UNSIGNED_NULL = 4294967295 - C_BIGINT_NULL = -9223372036854775808 - C_BIGINT_UNSIGNED_NULL = 18446744073709551615 - C_FLOAT_NULL = ctypes.c_float(struct.unpack(" name (mandatory) - > type_code (mandatory) - > display_size - > internal_size - > precision - > scale - > null_ok - - This attribute will be None for operations that do not return rows or - if the cursor has not had an operation invoked via the .execute*() method yet. - - .rowcount:This read-only attribute specifies the number of rows that the last - .execute*() produced (for DQL statements like SELECT) or affected - """ - - def __init__(self, connection=None): - self._description = [] - self._rowcount = -1 - self._connection = None - self._result = None - self._fields = None - self._block = None - self._block_rows = -1 - self._block_iter = 0 - self._affected_rows = 0 - self._logfile = "" - - if connection is not None: - self._connection = connection - - def __iter__(self): - return self - - def __next__(self): - return self._taos_next() - - def next(self): - return self._taos_next() - - def _taos_next(self): - if self._result is None or self._fields is None: - raise OperationalError("Invalid use of fetch iterator") - - if self._block_rows <= self._block_iter: - block, self._block_rows = taos_fetch_row(self._result, self._fields) - if self._block_rows == 0: - raise StopIteration - self._block = list(map(tuple, zip(*block))) - self._block_iter = 0 - - data = self._block[self._block_iter] - self._block_iter += 1 - - return data - - @property - def description(self): - """Return the description of the object.""" - return self._description - - @property - def rowcount(self): - """Return the rowcount of the object""" - return self._rowcount - - @property - def affected_rows(self): - """Return the rowcount of insertion""" - return self._affected_rows - - def callproc(self, procname, *args): - """Call a stored database procedure with the given name. - - Void functionality since no stored procedures. - """ - pass - - def log(self, logfile): - self._logfile = logfile - - def close(self): - """Close the cursor.""" - if self._connection is None: - return False - - self._reset_result() - self._connection = None - - return True - - def execute(self, operation, params=None): - """Prepare and execute a database operation (query or command).""" - if not operation: - return None - - if not self._connection: - # TODO : change the exception raised here - raise ProgrammingError("Cursor is not connected") - - self._reset_result() - - stmt = operation - if params is not None: - pass - - # global querySeqNum - # querySeqNum += 1 - # localSeqNum = querySeqNum # avoid race condition - # print(" >> Exec Query ({}): {}".format(localSeqNum, str(stmt))) - self._result = taos_query(self._connection._conn, stmt) - # print(" << Query ({}) Exec Done".format(localSeqNum)) - if self._logfile: - with open(self._logfile, "a") as logfile: - logfile.write("%s;\n" % operation) - - if taos_field_count(self._result) == 0: - affected_rows = taos_affected_rows(self._result) - self._affected_rows += affected_rows - return affected_rows - else: - self._fields = taos_fetch_fields(self._result) - return self._handle_result() - - def executemany(self, operation, seq_of_parameters): - """Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters.""" - pass - - def fetchone(self): - """Fetch the next row of a query result set, returning a single sequence, or None when no more data is available.""" - pass - - def fetchmany(self): - pass - - def istype(self, col, dataType): - if dataType.upper() == "BOOL": - if self._description[col][1] == FieldType.C_BOOL: - return True - if dataType.upper() == "TINYINT": - if self._description[col][1] == FieldType.C_TINYINT: - return True - if dataType.upper() == "TINYINT UNSIGNED": - if self._description[col][1] == FieldType.C_TINYINT_UNSIGNED: - return True - if dataType.upper() == "SMALLINT": - if self._description[col][1] == FieldType.C_SMALLINT: - return True - if dataType.upper() == "SMALLINT UNSIGNED": - if self._description[col][1] == FieldType.C_SMALLINT_UNSIGNED: - return True - if dataType.upper() == "INT": - if self._description[col][1] == FieldType.C_INT: - return True - if dataType.upper() == "INT UNSIGNED": - if self._description[col][1] == FieldType.C_INT_UNSIGNED: - return True - if dataType.upper() == "BIGINT": - if self._description[col][1] == FieldType.C_BIGINT: - return True - if dataType.upper() == "BIGINT UNSIGNED": - if self._description[col][1] == FieldType.C_BIGINT_UNSIGNED: - return True - if dataType.upper() == "FLOAT": - if self._description[col][1] == FieldType.C_FLOAT: - return True - if dataType.upper() == "DOUBLE": - if self._description[col][1] == FieldType.C_DOUBLE: - return True - if dataType.upper() == "BINARY": - if self._description[col][1] == FieldType.C_BINARY: - return True - if dataType.upper() == "TIMESTAMP": - if self._description[col][1] == FieldType.C_TIMESTAMP: - return True - if dataType.upper() == "NCHAR": - if self._description[col][1] == FieldType.C_NCHAR: - return True - if dataType.upper() == "JSON": - if self._description[col][1] == FieldType.C_JSON: - return True - - return False - - def fetchall_row(self): - """Fetch all (remaining) rows of a query result, returning them as a sequence of sequences (e.g. a list of tuples). Note that the cursor's arraysize attribute can affect the performance of this operation.""" - if self._result is None or self._fields is None: - raise OperationalError("Invalid use of fetchall") - - buffer = [[] for i in range(len(self._fields))] - self._rowcount = 0 - while True: - block, num_of_fields = taos_fetch_row(self._result, self._fields) - errno = taos_errno(self._result) - if errno != 0: - raise ProgrammingError(taos_errstr(self._result), errno) - if num_of_fields == 0: - break - self._rowcount += num_of_fields - for i in range(len(self._fields)): - buffer[i].extend(block[i]) - return list(map(tuple, zip(*buffer))) - - def fetchall(self): - if self._result is None: - raise OperationalError("Invalid use of fetchall") - fields = self._fields if self._fields is not None else taos_fetch_fields(self._result) - buffer = [[] for i in range(len(fields))] - self._rowcount = 0 - while True: - block, num_of_fields = taos_fetch_block(self._result, self._fields) - errno = taos_errno(self._result) - if errno != 0: - raise ProgrammingError(taos_errstr(self._result), errno) - if num_of_fields == 0: - break - self._rowcount += num_of_fields - for i in range(len(self._fields)): - buffer[i].extend(block[i]) - return list(map(tuple, zip(*buffer))) - - def stop_query(self): - if self._result != None: - taos_stop_query(self._result) - - def nextset(self): - """ """ - pass - - def setinputsize(self, sizes): - pass - - def setutputsize(self, size, column=None): - pass - - def _reset_result(self): - """Reset the result to unused version.""" - self._description = [] - self._rowcount = -1 - if self._result is not None: - taos_free_result(self._result) - self._result = None - self._fields = None - self._block = None - self._block_rows = -1 - self._block_iter = 0 - self._affected_rows = 0 - - def _handle_result(self): - """Handle the return result from query.""" - self._description = [] - for ele in self._fields: - self._description.append((ele["name"], ele["type"], None, None, None, None, False)) - - return self._result - - def __del__(self): - self.close() diff --git a/src/connector/python/taos/error.py b/src/connector/python/taos/error.py deleted file mode 100644 index 122466fe3c448ec551fb910c402ad14bb6c93336..0000000000000000000000000000000000000000 --- a/src/connector/python/taos/error.py +++ /dev/null @@ -1,111 +0,0 @@ -# encoding:UTF-8 -"""Python exceptions -""" - - -class Error(Exception): - def __init__(self, msg=None, errno=0xffff): - self.msg = msg - self.errno = errno - self._full_msg = "[0x%04x]: %s" % (self.errno & 0xffff, self.msg) - - def __str__(self): - return self._full_msg - - -class Warning(Exception): - """Exception raised for important warnings like data truncations while inserting.""" - - pass - - -class InterfaceError(Error): - """Exception raised for errors that are related to the database interface rather than the database itself.""" - - pass - - -class DatabaseError(Error): - """Exception raised for errors that are related to the database.""" - - pass - -class ConnectionError(Error): - """Exceptin raised for connection failed""" - pass - -class DataError(DatabaseError): - """Exception raised for errors that are due to problems with the processed data like division by zero, numeric value out of range.""" - - pass - - -class OperationalError(DatabaseError): - """Exception raised for errors that are related to the database's operation and not necessarily under the control of the programmer""" - - pass - - -class IntegrityError(DatabaseError): - """Exception raised when the relational integrity of the database is affected.""" - - pass - - -class InternalError(DatabaseError): - """Exception raised when the database encounters an internal error.""" - - pass - - -class ProgrammingError(DatabaseError): - """Exception raised for programming errors.""" - - pass - - -class NotSupportedError(DatabaseError): - """Exception raised in case a method or database API was used which is not supported by the database,.""" - - pass - - -class StatementError(DatabaseError): - """Exception raised in STMT API.""" - - pass - -class ResultError(DatabaseError): - """Result related APIs.""" - - pass - -class SchemalessError(DatabaseError): - """taos_schemaless_insert errors.""" - - def __init__(self, msg=None, errno=0xffff, affected_rows=0): - DatabaseError.__init__(self, msg, errno) - self.affected_rows = affected_rows - - def __str__(self): - return self._full_msg + "(affected rows: %d)" % self.affected_rows - - # @property - # def affected_rows(self): - # return self.affected_rows - - -class StatementError(DatabaseError): - """Exception raised in STMT API.""" - - pass - -class ResultError(DatabaseError): - """Result related APIs.""" - - pass - -class LinesError(DatabaseError): - """taos_insert_lines errors.""" - - pass \ No newline at end of file diff --git a/src/connector/python/taos/field.py b/src/connector/python/taos/field.py deleted file mode 100644 index a6d64422e238b46b096a5ae62c42566666f226ad..0000000000000000000000000000000000000000 --- a/src/connector/python/taos/field.py +++ /dev/null @@ -1,309 +0,0 @@ -# encoding:UTF-8 -import ctypes -import math -import datetime -from ctypes import * - -from .constants import FieldType -from .error import * - -_datetime_epoch = datetime.datetime.fromtimestamp(0) - -def _convert_millisecond_to_datetime(milli): - return _datetime_epoch + datetime.timedelta(seconds=milli / 1000.0) - - -def _convert_microsecond_to_datetime(micro): - return _datetime_epoch + datetime.timedelta(seconds=micro / 1000000.0) - - -def _convert_nanosecond_to_datetime(nanosec): - return nanosec - - -def _crow_timestamp_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN): - """Function to convert C bool row to python row""" - _timestamp_converter = _convert_millisecond_to_datetime - if precision == FieldType.C_TIMESTAMP_MILLI: - _timestamp_converter = _convert_millisecond_to_datetime - elif precision == FieldType.C_TIMESTAMP_MICRO: - _timestamp_converter = _convert_microsecond_to_datetime - elif precision == FieldType.C_TIMESTAMP_NANO: - _timestamp_converter = _convert_nanosecond_to_datetime - else: - raise DatabaseError("Unknown precision returned from database") - - return [ - None if ele == FieldType.C_BIGINT_NULL else _timestamp_converter(ele) - for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_int64))[: abs(num_of_rows)] - ] - - -def _crow_bool_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN): - """Function to convert C bool row to python row""" - return [ - None if ele == FieldType.C_BOOL_NULL else bool(ele) - for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[: abs(num_of_rows)] - ] - - -def _crow_tinyint_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN): - """Function to convert C tinyint row to python row""" - return [ - None if ele == FieldType.C_TINYINT_NULL else ele - for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[: abs(num_of_rows)] - ] - - -def _crow_tinyint_unsigned_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN): - """Function to convert C tinyint row to python row""" - return [ - None if ele == FieldType.C_TINYINT_UNSIGNED_NULL else ele - for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_ubyte))[: abs(num_of_rows)] - ] - - -def _crow_smallint_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN): - """Function to convert C smallint row to python row""" - return [ - None if ele == FieldType.C_SMALLINT_NULL else ele - for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_short))[: abs(num_of_rows)] - ] - - -def _crow_smallint_unsigned_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN): - """Function to convert C smallint row to python row""" - return [ - None if ele == FieldType.C_SMALLINT_UNSIGNED_NULL else ele - for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_ushort))[: abs(num_of_rows)] - ] - - -def _crow_int_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN): - """Function to convert C int row to python row""" - return [ - None if ele == FieldType.C_INT_NULL else ele - for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_int))[: abs(num_of_rows)] - ] - - -def _crow_int_unsigned_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN): - """Function to convert C int row to python row""" - return [ - None if ele == FieldType.C_INT_UNSIGNED_NULL else ele - for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_uint))[: abs(num_of_rows)] - ] - - -def _crow_bigint_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN): - """Function to convert C bigint row to python row""" - return [ - None if ele == FieldType.C_BIGINT_NULL else ele - for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_int64))[: abs(num_of_rows)] - ] - - -def _crow_bigint_unsigned_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN): - """Function to convert C bigint row to python row""" - return [ - None if ele == FieldType.C_BIGINT_UNSIGNED_NULL else ele - for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_uint64))[: abs(num_of_rows)] - ] - - -def _crow_float_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN): - """Function to convert C float row to python row""" - return [ - None if math.isnan(ele) else ele - for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_float))[: abs(num_of_rows)] - ] - - -def _crow_double_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN): - """Function to convert C double row to python row""" - return [ - None if math.isnan(ele) else ele - for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_double))[: abs(num_of_rows)] - ] - - -def _crow_binary_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN): - """Function to convert C binary row to python row""" - assert nbytes is not None - return [ - None if ele.value[0:1] == FieldType.C_BINARY_NULL else ele.value.decode("utf-8") - for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_char * nbytes)))[: abs(num_of_rows)] - ] - - -def _crow_nchar_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN): - """Function to convert C nchar row to python row""" - assert nbytes is not None - res = [] - for i in range(abs(num_of_rows)): - try: - if num_of_rows >= 0: - tmpstr = ctypes.c_char_p(data) - res.append(tmpstr.value.decode("utf-8")) - else: - res.append( - ( - ctypes.cast( - data + nbytes * i, - ctypes.POINTER(ctypes.c_wchar * (nbytes // 4)), - ) - )[0].value - ) - except ValueError: - res.append(None) - - return res - - -def _crow_binary_to_python_block(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN): - """Function to convert C binary row to python row""" - assert nbytes is not None - res = [] - for i in range(abs(num_of_rows)): - rbyte = ctypes.cast(data + nbytes * i, ctypes.POINTER(ctypes.c_short))[:1].pop() - chars = ctypes.cast(c_char_p(data + nbytes * i + 2), ctypes.POINTER(c_char * rbyte)) - buffer = create_string_buffer(rbyte + 1) - buffer[:rbyte] = chars[0][:rbyte] - if rbyte == 1 and buffer[0] == b'\xff': - res.append(None) - else: - res.append(cast(buffer, c_char_p).value.decode("utf-8")) - return res - - -def _crow_nchar_to_python_block(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN): - """Function to convert C nchar row to python row""" - assert nbytes is not None - res = [] - for i in range(abs(num_of_rows)): - rbyte = ctypes.cast(data + nbytes * i, ctypes.POINTER(ctypes.c_short))[:1].pop() - chars = ctypes.cast(c_char_p(data + nbytes * i + 2), ctypes.POINTER(c_char * rbyte)) - buffer = create_string_buffer(rbyte + 1) - buffer[:rbyte] = chars[0][:rbyte] - if rbyte == 4 and buffer[:4] == b'\xff'*4: - res.append(None) - else: - res.append(cast(buffer, c_char_p).value.decode("utf-8")) - return res - - -CONVERT_FUNC = { - FieldType.C_BOOL: _crow_bool_to_python, - FieldType.C_TINYINT: _crow_tinyint_to_python, - FieldType.C_SMALLINT: _crow_smallint_to_python, - FieldType.C_INT: _crow_int_to_python, - FieldType.C_BIGINT: _crow_bigint_to_python, - FieldType.C_FLOAT: _crow_float_to_python, - FieldType.C_DOUBLE: _crow_double_to_python, - FieldType.C_BINARY: _crow_binary_to_python, - FieldType.C_TIMESTAMP: _crow_timestamp_to_python, - FieldType.C_NCHAR: _crow_nchar_to_python, - FieldType.C_TINYINT_UNSIGNED: _crow_tinyint_unsigned_to_python, - FieldType.C_SMALLINT_UNSIGNED: _crow_smallint_unsigned_to_python, - FieldType.C_INT_UNSIGNED: _crow_int_unsigned_to_python, - FieldType.C_BIGINT_UNSIGNED: _crow_bigint_unsigned_to_python, - FieldType.C_JSON: _crow_nchar_to_python, -} - -CONVERT_FUNC_BLOCK = { - FieldType.C_BOOL: _crow_bool_to_python, - FieldType.C_TINYINT: _crow_tinyint_to_python, - FieldType.C_SMALLINT: _crow_smallint_to_python, - FieldType.C_INT: _crow_int_to_python, - FieldType.C_BIGINT: _crow_bigint_to_python, - FieldType.C_FLOAT: _crow_float_to_python, - FieldType.C_DOUBLE: _crow_double_to_python, - FieldType.C_BINARY: _crow_binary_to_python_block, - FieldType.C_TIMESTAMP: _crow_timestamp_to_python, - FieldType.C_NCHAR: _crow_nchar_to_python_block, - FieldType.C_TINYINT_UNSIGNED: _crow_tinyint_unsigned_to_python, - FieldType.C_SMALLINT_UNSIGNED: _crow_smallint_unsigned_to_python, - FieldType.C_INT_UNSIGNED: _crow_int_unsigned_to_python, - FieldType.C_BIGINT_UNSIGNED: _crow_bigint_unsigned_to_python, - FieldType.C_JSON: _crow_nchar_to_python_block, -} - -# Corresponding TAOS_FIELD structure in C - - -class TaosField(ctypes.Structure): - _fields_ = [ - ("_name", ctypes.c_char * 65), - ("_type", ctypes.c_uint8), - ("_bytes", ctypes.c_uint16), - ] - - @property - def name(self): - return self._name.decode("utf-8") - - @property - def length(self): - """alias to self.bytes""" - return self._bytes - - @property - def bytes(self): - return self._bytes - - @property - def type(self): - return self._type - - def __dict__(self): - return {"name": self.name, "type": self.type, "bytes": self.length} - - def __str__(self): - return "{name: %s, type: %d, bytes: %d}" % (self.name, self.type, self.length) - - def __getitem__(self, item): - return getattr(self, item) - - -class TaosFields(object): - def __init__(self, fields, count): - if isinstance(fields, c_void_p): - self._fields = cast(fields, POINTER(TaosField)) - if isinstance(fields, POINTER(TaosField)): - self._fields = fields - self._count = count - self._iter = 0 - - def as_ptr(self): - return self._fields - - @property - def count(self): - return self._count - - @property - def fields(self): - return self._fields - - def __next__(self): - return self._next_field() - - def next(self): - return self._next_field() - - def _next_field(self): - if self._iter < self.count: - field = self._fields[self._iter] - self._iter += 1 - return field - else: - raise StopIteration - - def __getitem__(self, item): - return self._fields[item] - - def __iter__(self): - return self - - def __len__(self): - return self.count diff --git a/src/connector/python/taos/precision.py b/src/connector/python/taos/precision.py deleted file mode 100644 index d67da592cce6d2121ec8f2eed78a30d6fa0c446b..0000000000000000000000000000000000000000 --- a/src/connector/python/taos/precision.py +++ /dev/null @@ -1,12 +0,0 @@ -class PrecisionEnum(object): - """Precision enums""" - - Milliseconds = 0 - Microseconds = 1 - Nanoseconds = 2 - - -class PrecisionError(Exception): - """Python datetime does not support nanoseconds error""" - - pass diff --git a/src/connector/python/taos/result.py b/src/connector/python/taos/result.py deleted file mode 100644 index 05085a493eb8ffede536476f1ddf3bcb083d82f8..0000000000000000000000000000000000000000 --- a/src/connector/python/taos/result.py +++ /dev/null @@ -1,263 +0,0 @@ -from .cinterface import * - -# from .connection import TaosConnection -from .error import * - -from ctypes import c_void_p - - -class TaosResult(object): - """TDengine result interface""" - - def __init__(self, result, close_after=False, conn=None): - # type: (c_void_p, bool, TaosConnection) -> TaosResult - # to make the __del__ order right - self._conn = conn - self._close_after = close_after - if isinstance(result, c_void_p): - self._result = result - else: - self._result = c_void_p(result) - - self._fields = None - self._field_count = None - self._precision = None - - self._block = None - self._block_length = None - self._row_count = 0 - - def __iter__(self): - return self - - def __next__(self): - return self._next_row() - - def next(self): - # fetch next row - return self._next_row() - - def _next_row(self): - if self._result is None or self.fields is None: - raise OperationalError("Invalid use of fetch iterator") - - if self._block is None or self._block_iter >= self._block_length: - self._block, self._block_length = self.fetch_block() - self._block_iter = 0 - # self._row_count += self._block_length - - raw = self._block[self._block_iter] - self._block_iter += 1 - return raw - - @property - def fields(self): - """fields definitions of the current result""" - if self._result is None: - raise ResultError("no result object setted") - if self._fields is None: - self._fields = taos_fetch_fields(self._result) - - return self._fields - - @property - def field_count(self): - """Field count of the current result, eq to taos_field_count(result)""" - return self.fields.count - - @property - def row_count(self): - """Return the rowcount of the object""" - return self._row_count - - @property - def precision(self): - if self._precision is None: - self._precision = taos_result_precision(self._result) - return self._precision - - @property - def affected_rows(self): - return taos_affected_rows(self._result) - - # @property - def field_lengths(self): - return taos_fetch_lengths(self._result, self.field_count) - - def rows_iter(self, num_of_rows=None): - return TaosRows(self, num_of_rows) - - def blocks_iter(self): - return TaosBlocks(self) - - def fetch_block(self): - if self._result is None: - raise OperationalError("Invalid use of fetch iterator") - - block, length = taos_fetch_block_raw(self._result) - if length == 0: - raise StopIteration - precision = self.precision - field_count = self.field_count - fields = self.fields - blocks = [None] * field_count - lengths = self.field_lengths() - for i in range(field_count): - data = ctypes.cast(block, ctypes.POINTER(ctypes.c_void_p))[i] - if fields[i].type not in CONVERT_FUNC_BLOCK: - raise DatabaseError("Invalid data type returned from database") - blocks[i] = CONVERT_FUNC_BLOCK[fields[i].type](data, length, lengths[i], precision) - - return list(map(tuple, zip(*blocks))), length - - def fetch_all(self): - if self._result is None: - raise OperationalError("Invalid use of fetchall") - - if self._fields is None: - self._fields = taos_fetch_fields(self._result) - buffer = [[] for i in range(len(self._fields))] - self._row_count = 0 - while True: - block, num_of_fields = taos_fetch_block(self._result, self._fields) - errno = taos_errno(self._result) - if errno != 0: - raise ProgrammingError(taos_errstr(self._result), errno) - if num_of_fields == 0: - break - self._row_count += num_of_fields - for i in range(len(self._fields)): - buffer[i].extend(block[i]) - return list(map(tuple, zip(*buffer))) - - def fetch_all_into_dict(self): - """Fetch all rows and convert it to dict""" - names = [field.name for field in self.fields] - rows = self.fetch_all() - return list(dict(zip(names, row)) for row in rows) - - def fetch_rows_a(self, callback, param): - taos_fetch_rows_a(self._result, callback, param) - - def stop_query(self): - return taos_stop_query(self._result) - - def errno(self): - """**DO NOT** use this directly unless you know what you are doing""" - return taos_errno(self._result) - - def errstr(self): - return taos_errstr(self._result) - - def check_error(self, errno=None, close=True): - if errno is None: - errno = self.errno() - if errno != 0: - msg = self.errstr() - self.close() - raise OperationalError(msg, errno) - - def close(self): - """free result object.""" - if self._result != None and self._close_after: - taos_free_result(self._result) - self._result = None - self._fields = None - self._field_count = None - self._field_lengths = None - - def __del__(self): - self.close() - - -class TaosRows: - """TDengine result rows iterator""" - - def __init__(self, result, num_of_rows=None): - self._result = result - self._num_of_rows = num_of_rows - - def __iter__(self): - return self - - def __next__(self): - return self._next_row() - - def next(self): - return self._next_row() - - def _next_row(self): - if self._result is None: - raise OperationalError("Invalid use of fetch iterator") - if self._num_of_rows != None and self._num_of_rows <= self._result._row_count: - raise StopIteration - - row = taos_fetch_row_raw(self._result._result) - if not row: - raise StopIteration - self._result._row_count += 1 - return TaosRow(self._result, row) - - @property - def row_count(self): - """Return the rowcount of the object""" - return self._result._row_count - - -class TaosRow: - def __init__(self, result, row): - self._result = result - self._row = row - - def __str__(self): - return taos_print_row(self._row, self._result.fields, self._result.field_count) - - def __call__(self): - return self.as_tuple() - - def _astuple(self): - return self.as_tuple() - - def __iter__(self): - return self.as_tuple() - - def as_ptr(self): - return self._row - - def as_tuple(self): - precision = self._result.precision - field_count = self._result.field_count - blocks = [None] * field_count - fields = self._result.fields - field_lens = self._result.field_lengths() - for i in range(field_count): - data = ctypes.cast(self._row, ctypes.POINTER(ctypes.c_void_p))[i] - if fields[i].type not in CONVERT_FUNC: - raise DatabaseError("Invalid data type returned from database") - if data is None: - blocks[i] = None - else: - blocks[i] = CONVERT_FUNC[fields[i].type](data, 1, field_lens[i], precision)[0] - return tuple(blocks) - - def as_dict(self): - values = self.as_tuple() - names = self._result.fields - dict(zip(names, values)) - - - -class TaosBlocks: - """TDengine result blocks iterator""" - - def __init__(self, result): - self._result = result - - def __iter__(self): - return self - - def __next__(self): - return self._result.fetch_block() - - def next(self): - return self._result.fetch_block() diff --git a/src/connector/python/taos/schemaless.py b/src/connector/python/taos/schemaless.py deleted file mode 100644 index 35967412f78a63e67d63f0e58bbf903f21fb275a..0000000000000000000000000000000000000000 --- a/src/connector/python/taos/schemaless.py +++ /dev/null @@ -1,17 +0,0 @@ - -class SmlPrecision: - """Schemaless timestamp precision constants""" - NOT_CONFIGURED = 0 # C.TSDB_SML_TIMESTAMP_NOT_CONFIGURED - HOURS = 1 - MINUTES = 2 - SECONDS = 3 - MILLI_SECONDS = 4 - MICRO_SECONDS = 5 - NANO_SECONDS = 6 - -class SmlProtocol: - """Schemaless protocol constants""" - UNKNOWN_PROTOCOL = 0 - LINE_PROTOCOL = 1 - TELNET_PROTOCOL = 2 - JSON_PROTOCOL = 3 \ No newline at end of file diff --git a/src/connector/python/taos/statement.py b/src/connector/python/taos/statement.py deleted file mode 100644 index 155e98173b7f920640aa84d0fcda618d2669bb1e..0000000000000000000000000000000000000000 --- a/src/connector/python/taos/statement.py +++ /dev/null @@ -1,85 +0,0 @@ -from taos.cinterface import * -from taos.error import * -from taos.result import * - - -class TaosStmt(object): - """TDengine STMT interface""" - - def __init__(self, stmt, conn = None): - self._conn = conn - self._stmt = stmt - - def set_tbname(self, name): - """Set table name if needed. - - Note that the set_tbname* method should only used in insert statement - """ - if self._stmt is None: - raise StatementError("Invalid use of set_tbname") - taos_stmt_set_tbname(self._stmt, name) - - def prepare(self, sql): - # type: (str) -> None - taos_stmt_prepare(self._stmt, sql) - - def set_tbname_tags(self, name, tags): - # type: (str, Array[TaosBind]) -> None - """Set table name with tags, tags is array of BindParams""" - if self._stmt is None: - raise StatementError("Invalid use of set_tbname") - taos_stmt_set_tbname_tags(self._stmt, name, tags) - - def bind_param(self, params, add_batch=True): - # type: (Array[TaosBind], bool) -> None - if self._stmt is None: - raise StatementError("Invalid use of stmt") - taos_stmt_bind_param(self._stmt, params) - if add_batch: - taos_stmt_add_batch(self._stmt) - - def bind_param_batch(self, binds, add_batch=True): - # type: (Array[TaosMultiBind], bool) -> None - if self._stmt is None: - raise StatementError("Invalid use of stmt") - taos_stmt_bind_param_batch(self._stmt, binds) - if add_batch: - taos_stmt_add_batch(self._stmt) - - def add_batch(self): - if self._stmt is None: - raise StatementError("Invalid use of stmt") - taos_stmt_add_batch(self._stmt) - - def execute(self): - if self._stmt is None: - raise StatementError("Invalid use of execute") - taos_stmt_execute(self._stmt) - - def use_result(self): - result = taos_stmt_use_result(self._stmt) - return TaosResult(result) - - def close(self): - """Close stmt.""" - if self._stmt is None: - return - taos_stmt_close(self._stmt) - self._stmt = None - - def __del__(self): - self.close() - - -if __name__ == "__main__": - from taos.connection import TaosConnection - - conn = TaosConnection() - - stmt = conn.statement("select * from log.log limit 10") - stmt.execute() - result = stmt.use_result() - for row in result: - print(row) - stmt.close() - conn.close() diff --git a/src/connector/python/taos/stream.py b/src/connector/python/taos/stream.py deleted file mode 100644 index fe3c8c85e3279511972293882224bf20c30dfa64..0000000000000000000000000000000000000000 --- a/src/connector/python/taos/stream.py +++ /dev/null @@ -1,22 +0,0 @@ -from taos.cinterface import * -from taos.error import * -from taos.result import * - - -class TaosStream(object): - """TDengine Stream interface""" - - def __init__(self, stream): - self._raw = stream - - def as_ptr(self): - return self._raw - - def close(self): - """Close stmt.""" - if self._raw is not None: - taos_close_stream(self._raw) - self._raw = None - - def __del__(self): - self.close() diff --git a/src/connector/python/taos/subscription.py b/src/connector/python/taos/subscription.py deleted file mode 100644 index 3c6958b6f8d55791b9753a84a4bbd7653bdae780..0000000000000000000000000000000000000000 --- a/src/connector/python/taos/subscription.py +++ /dev/null @@ -1,49 +0,0 @@ -from taos.result import TaosResult -from .cinterface import * -from .error import * - - -class TaosSubscription(object): - """TDengine subscription object""" - - def __init__(self, sub, with_callback = False): - self._sub = sub - self._with_callback = with_callback - - def consume(self): - """Consume rows of a subscription""" - if self._sub is None: - raise OperationalError("Invalid use of consume") - if self._with_callback: - raise OperationalError("DONOT use consume method in an subscription with callback") - result = taos_consume(self._sub) - return TaosResult(result) - - def close(self, keepProgress=True): - """Close the Subscription.""" - if self._sub is None: - return False - - taos_unsubscribe(self._sub, keepProgress) - self._sub = None - return True - - def __del__(self): - self.close() - - -if __name__ == "__main__": - from .connection import TaosConnection - - conn = TaosConnection(host="127.0.0.1", user="root", password="taosdata", database="test") - - # Generate a cursor object to run SQL commands - sub = conn.subscribe(True, "test", "select * from meters;", 1000) - - for i in range(0, 10): - data = sub.consume() - for d in data: - print(d) - - sub.close() - conn.close() diff --git a/src/connector/python/taos/timestamp.py b/src/connector/python/taos/timestamp.py deleted file mode 100644 index ab5679fdf12e2942aa94f76716ff98e6d2a88d69..0000000000000000000000000000000000000000 --- a/src/connector/python/taos/timestamp.py +++ /dev/null @@ -1,17 +0,0 @@ - -class TimestampType(object): - """Choose which type that parsing TDengine timestamp data to - - - DATETIME: use python datetime.datetime, note that it does not support nanosecond precision, - and python taos will use raw c_int64 as a fallback for nanosecond results. - - NUMPY: use numpy.datetime64 type. - - RAW: use raw c_int64. - - TAOS: use taos' TaosTimestamp. - """ - DATETIME = 0, - NUMPY = 1, - RAW = 2, - TAOS = 3, - -class TaosTimestamp: - pass diff --git a/src/connector/python/tests/test-td6231.py b/src/connector/python/tests/test-td6231.py deleted file mode 100644 index e55d22c10734eedcbd5be8012eaeb3fb3d51e381..0000000000000000000000000000000000000000 --- a/src/connector/python/tests/test-td6231.py +++ /dev/null @@ -1,50 +0,0 @@ -from taos import * - -conn = connect() - -dbname = "pytest_taos_stmt_multi" -conn.execute("drop database if exists %s" % dbname) -conn.execute("create database if not exists %s" % dbname) -conn.select_db(dbname) - -conn.execute( - "create table if not exists log(ts timestamp, bo bool, nil tinyint, \ - ti tinyint, si smallint, ii int, bi bigint, tu tinyint unsigned, \ - su smallint unsigned, iu int unsigned, bu bigint unsigned, \ - ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)", -) - -stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)") - -params = new_multi_binds(16) -params[0].timestamp((1626861392589, 1626861392590, 1626861392591)) -params[1].bool((True, None, False)) -params[2].tinyint([-128, -128, None]) # -128 is tinyint null -params[3].tinyint([0, 127, None]) -params[4].smallint([3, None, 2]) -params[5].int([3, 4, None]) -params[6].bigint([3, 4, None]) -params[7].tinyint_unsigned([3, 4, None]) -params[8].smallint_unsigned([3, 4, None]) -params[9].int_unsigned([3, 4, None]) -params[10].bigint_unsigned([3, 4, None]) -params[11].float([3, None, 1]) -params[12].double([3, None, 1.2]) -params[13].binary(["abc", "dddafadfadfadfadfa", None]) -# params[14].nchar(["涛思数据", None, "a long string with 中文字符"]) -params[14].nchar([None, None, None]) -params[15].timestamp([None, None, 1626861392591]) -stmt.bind_param_batch(params) -stmt.execute() - - -result = stmt.use_result() -assert result.affected_rows == 3 -result.close() - -result = conn.query("select * from log") -for row in result: - print(row) -result.close() -stmt.close() -conn.close() diff --git a/src/connector/python/tests/test_ctaos.py b/src/connector/python/tests/test_ctaos.py deleted file mode 100644 index 7b9566931f2b29dcbdc8646d2f087ebf40e716cc..0000000000000000000000000000000000000000 --- a/src/connector/python/tests/test_ctaos.py +++ /dev/null @@ -1,162 +0,0 @@ -from taos.cinterface import * -from taos.precision import * -from taos.bind import * - -import time -import datetime -import pytest - -@pytest.fixture -def conn(): - return CTaosInterface().connect() - - -def test_simple(conn, caplog): - dbname = "pytest_ctaos_simple" - try: - res = taos_query(conn, "create database if not exists %s" % dbname) - taos_free_result(res) - - taos_select_db(conn, dbname) - - res = taos_query( - conn, - "create table if not exists log(ts timestamp, level tinyint, content binary(100), ipaddr binary(134))", - ) - taos_free_result(res) - - res = taos_query(conn, "insert into log values(now, 1, 'hello', 'test')") - taos_free_result(res) - - res = taos_query(conn, "select level,content,ipaddr from log limit 1") - - fields = taos_fetch_fields_raw(res) - field_count = taos_field_count(res) - - fields = taos_fetch_fields(res) - for field in fields: - print(field) - - # field_lengths = taos_fetch_lengths(res, field_count) - # if not field_lengths: - # raise "fetch lengths error" - - row = taos_fetch_row_raw(res) - rowstr = taos_print_row(row, fields, field_count) - assert rowstr == "1 hello test" - - row, num = taos_fetch_row(res, fields) - print(row) - taos_free_result(res) - taos_query(conn, "drop database if exists " + dbname) - taos_close(conn) - except Exception as err: - taos_query(conn, "drop database if exists " + dbname) - raise err - - -def test_stmt(conn, caplog): - dbname = "pytest_ctaos_stmt" - try: - res = taos_query(conn, "drop database if exists %s" % dbname) - taos_free_result(res) - res = taos_query(conn, "create database if not exists %s" % dbname) - taos_free_result(res) - - taos_select_db(conn, dbname) - - res = taos_query( - conn, - "create table if not exists log(ts timestamp, nil tinyint, ti tinyint, si smallint, ii int,\ - bi bigint, tu tinyint unsigned, su smallint unsigned, iu int unsigned, bu bigint unsigned, \ - ff float, dd double, bb binary(100), nn nchar(100))", - ) - taos_free_result(res) - - stmt = taos_stmt_init(conn) - - taos_stmt_prepare(stmt, "insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)") - - params = new_bind_params(14) - params[0].timestamp(1626861392589, PrecisionEnum.Milliseconds) - params[1].null() - params[2].tinyint(2) - params[3].smallint(3) - params[4].int(4) - params[5].bigint(5) - params[6].tinyint_unsigned(6) - params[7].smallint_unsigned(7) - params[8].int_unsigned(8) - params[9].bigint_unsigned(9) - params[10].float(10.1) - params[11].double(10.11) - params[12].binary("hello") - params[13].nchar("stmt") - taos_stmt_bind_param(stmt, params) - taos_stmt_add_batch(stmt) - taos_stmt_execute(stmt) - - res = taos_query(conn, "select * from log limit 1") - - fields = taos_fetch_fields(res) - filed_count = taos_field_count(res) - - row = taos_fetch_row_raw(res) - rowstr = taos_print_row(row, fields, filed_count, 100) - - taos_free_result(res) - taos_query(conn, "drop database if exists " + dbname) - taos_close(conn) - - assert rowstr == "1626861392589 NULL 2 3 4 5 6 7 8 9 10.100000 10.110000 hello stmt" - except Exception as err: - taos_query(conn, "drop database if exists " + dbname) - raise err - -def stream_callback(param, result, row): - # type: (c_void_p, c_void_p, c_void_p) -> None - try: - if result == None or row == None: - return - result = c_void_p(result) - row = c_void_p(row) - fields = taos_fetch_fields_raw(result) - num_fields = taos_field_count(result) - s = taos_print_row(row, fields, num_fields) - print(s) - taos_stop_query(result) - except Exception as err: - print(err) - -def test_stream(conn, caplog): - dbname = "pytest_ctaos_stream" - try: - res = taos_query(conn, "create database if not exists %s" % dbname) - taos_free_result(res) - - taos_select_db(conn, dbname) - - res = taos_query( - conn, - "create table if not exists log(ts timestamp, n int)", - ) - taos_free_result(res) - - res = taos_query(conn, "select count(*) from log interval(5s)") - cc = taos_num_fields(res) - assert cc == 2 - - stream = taos_open_stream(conn, "select count(*) from log interval(5s)", stream_callback, 0, None, None) - print("waiting for data") - time.sleep(1) - - for i in range(0, 2): - res = taos_query(conn, "insert into log values(now,0)(now+1s, 1)(now + 2s, 2)") - taos_free_result(res) - time.sleep(2) - taos_close_stream(stream) - taos_query(conn, "drop database if exists " + dbname) - taos_close(conn) - except Exception as err: - taos_query(conn, "drop database if exists " + dbname) - raise err diff --git a/src/connector/python/tests/test_info.py b/src/connector/python/tests/test_info.py deleted file mode 100644 index bddfec7ef9ddbc203adfcadd262839048466592c..0000000000000000000000000000000000000000 --- a/src/connector/python/tests/test_info.py +++ /dev/null @@ -1,23 +0,0 @@ -from taos.cinterface import * - -from taos import * - -import pytest - -@pytest.fixture -def conn(): - return connect() - -def test_client_info(): - print(taos_get_client_info()) - None - -def test_server_info(conn): - # type: (TaosConnection) -> None - print(conn.client_info) - print(conn.server_info) - None - -if __name__ == "__main__": - test_client_info() - test_server_info(connect()) diff --git a/src/connector/python/tests/test_lines.py b/src/connector/python/tests/test_lines.py deleted file mode 100644 index 51d23b8e891d398b404086fdb2ff2910dcc1eb0a..0000000000000000000000000000000000000000 --- a/src/connector/python/tests/test_lines.py +++ /dev/null @@ -1,118 +0,0 @@ -from taos.error import OperationalError, SchemalessError -from taos import connect, new_bind_params, PrecisionEnum -from taos import * - -from ctypes import * -import taos -import pytest - - -@pytest.fixture -def conn(): - # type: () -> taos.TaosConnection - return connect() - - -def test_schemaless_insert_update_2(conn): - # type: (TaosConnection) -> None - - dbname = "test_schemaless_insert_update_2" - try: - conn.execute("drop database if exists %s" % dbname) - conn.execute("create database if not exists %s update 2 precision 'ns'" % dbname) - conn.select_db(dbname) - - lines = [ - 'st,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin, abc",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000', - ] - res = conn.schemaless_insert(lines, 1, 0) - print("affected rows: ", res) - assert(res == 1) - - result = conn.query("select * from st") - [before] = result.fetch_all_into_dict() - assert(before["c3"] == "passitagin, abc") - - lines = [ - 'st,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000', - ] - res = conn.schemaless_insert(lines, 1, 0) - result = conn.query("select * from st") - [after] = result.fetch_all_into_dict() - assert(after["c3"] == "passitagin") - - conn.execute("drop database if exists %s" % dbname) - conn.close() - - except Exception as err: - conn.execute("drop database if exists %s" % dbname) - conn.close() - print(err) - raise err - -def test_schemaless_insert(conn): - # type: (TaosConnection) -> None - - dbname = "pytest_taos_schemaless_insert" - try: - conn.execute("drop database if exists %s" % dbname) - conn.execute("create database if not exists %s update 2 precision 'ns'" % dbname) - conn.select_db(dbname) - - lines = [ - 'st,t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"passit",c2=false,c4=4f64 1626006833639000000', - 'st,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin, abc",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000', - 'stf,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000', - ] - res = conn.schemaless_insert(lines, 1, 0) - print("affected rows: ", res) - assert(res == 3) - - lines = [ - 'stf,t1=5i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000', - ] - res = conn.schemaless_insert(lines, 1, 0) - print("affected rows: ", res) - assert(res == 1) - result = conn.query("select * from st") - - dict2 = result.fetch_all_into_dict() - print(dict2) - result.row_count - all = result.rows_iter() - for row in all: - print(row) - result.close() - assert(result.row_count == 2) - - # error test - lines = [ - ',t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"passit",c2=false,c4=4f64 1626006833639000000', - ] - try: - res = conn.schemaless_insert(lines, 1, 0) - print(res) - # assert(False) - except SchemalessError as err: - pass - - result = conn.query("select * from st") - result.row_count - all = result.rows_iter() - for row in all: - print(row) - result.close() - - conn.execute("drop database if exists %s" % dbname) - conn.close() - - except Exception as err: - conn.execute("drop database if exists %s" % dbname) - conn.close() - print(err) - raise err - - -if __name__ == "__main__": - test_schemaless_insert(connect()) - test_schemaless_insert_update_2(connect()) diff --git a/src/connector/python/tests/test_query.py b/src/connector/python/tests/test_query.py deleted file mode 100644 index f4e139b1f14df29e8b6304dd2ca03519ea274f43..0000000000000000000000000000000000000000 --- a/src/connector/python/tests/test_query.py +++ /dev/null @@ -1,43 +0,0 @@ -from datetime import datetime -import taos -import pytest - -@pytest.fixture -def conn(): - return taos.connect() - -def test_query(conn): - """This test will use fetch_block for rows fetching, significantly faster than rows_iter""" - result = conn.query("select * from log.log limit 10000") - fields = result.fields - for field in fields: - print("field: %s" % field) - start = datetime.now() - for row in result: - # print(row) - None - end = datetime.now() - elapsed = end - start - print("elapsed time: ", elapsed) - result.close() - conn.close() - -def test_query_row_iter(conn): - """This test will use fetch_row for each row fetching, this is the only way in async callback""" - result = conn.query("select * from log.log limit 10000") - fields = result.fields - for field in fields: - print("field: %s" % field) - start = datetime.now() - for row in result.rows_iter(): - # print(row) - None - end = datetime.now() - elapsed = end - start - print("elapsed time: ", elapsed) - result.close() - conn.close() - -if __name__ == "__main__": - test_query(taos.connect(database = "log")) - test_query_row_iter(taos.connect(database = "log")) diff --git a/src/connector/python/tests/test_query_a.py b/src/connector/python/tests/test_query_a.py deleted file mode 100644 index 2b4be5695a87f1fd1017435b13983df7c4f70f06..0000000000000000000000000000000000000000 --- a/src/connector/python/tests/test_query_a.py +++ /dev/null @@ -1,66 +0,0 @@ -from taos import * -from ctypes import * -import taos -import pytest -import time - - -@pytest.fixture -def conn(): - return taos.connect() - -def fetch_callback(p_param, p_result, num_of_rows): - print("fetched ", num_of_rows, "rows") - p = cast(p_param, POINTER(Counter)) - result = TaosResult(p_result) - - if num_of_rows == 0: - print("fetching completed") - p.contents.done = True - result.close() - return - if num_of_rows < 0: - p.contents.done = True - result.check_error(num_of_rows) - result.close() - return None - - for row in result.rows_iter(num_of_rows): - # print(row) - None - p.contents.count += result.row_count - result.fetch_rows_a(fetch_callback, p_param) - - - -def query_callback(p_param, p_result, code): - # type: (c_void_p, c_void_p, c_int) -> None - if p_result == None: - return - result = TaosResult(p_result) - if code == 0: - result.fetch_rows_a(fetch_callback, p_param) - result.check_error(code) - - -class Counter(Structure): - _fields_ = [("count", c_int), ("done", c_bool)] - - def __str__(self): - return "{ count: %d, done: %s }" % (self.count, self.done) - - -def test_query(conn): - # type: (TaosConnection) -> None - counter = Counter(count=0) - conn.query_a("select * from log.log", query_callback, byref(counter)) - - while not counter.done: - print("wait query callback") - time.sleep(1) - print(counter) - conn.close() - - -if __name__ == "__main__": - test_query(taos.connect()) diff --git a/src/connector/python/tests/test_stmt.py b/src/connector/python/tests/test_stmt.py deleted file mode 100644 index 3368ecb6a9336a4295790f2cd55314ac9bb6290e..0000000000000000000000000000000000000000 --- a/src/connector/python/tests/test_stmt.py +++ /dev/null @@ -1,150 +0,0 @@ -# encoding:UTF-8 -from taos import * - -from ctypes import * -from datetime import datetime -import taos -import pytest - -@pytest.fixture -def conn(): - # type: () -> taos.TaosConnection - return connect() - -def test_stmt_insert(conn): - # type: (TaosConnection) -> None - - dbname = "pytest_taos_stmt" - try: - conn.execute("drop database if exists %s" % dbname) - conn.execute("create database if not exists %s" % dbname) - conn.select_db(dbname) - - conn.execute( - "create table if not exists log(ts timestamp, bo bool, nil tinyint, ti tinyint, si smallint, ii int,\ - bi bigint, tu tinyint unsigned, su smallint unsigned, iu int unsigned, bu bigint unsigned, \ - ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)", - ) - conn.load_table_info("log") - - - stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)") - params = new_bind_params(16) - params[0].timestamp(1626861392589, PrecisionEnum.Milliseconds) - params[1].bool(True) - params[2].null() - params[3].tinyint(2) - params[4].smallint(3) - params[5].int(4) - params[6].bigint(5) - params[7].tinyint_unsigned(6) - params[8].smallint_unsigned(7) - params[9].int_unsigned(8) - params[10].bigint_unsigned(9) - params[11].float(10.1) - params[12].double(10.11) - params[13].binary("hello") - params[14].nchar("stmt") - params[15].timestamp(1626861392589, PrecisionEnum.Milliseconds) - - stmt.bind_param(params) - stmt.execute() - - result = stmt.use_result() - assert result.affected_rows == 1 - result.close() - stmt.close() - - stmt = conn.statement("select * from log") - stmt.execute() - result = stmt.use_result() - row = result.next() - print(row) - assert row[2] == None - for i in range(3, 11): - assert row[i] == i - 1 - #float == may not work as expected - # assert row[10] == c_float(10.1) - assert row[12] == 10.11 - assert row[13] == "hello" - assert row[14] == "stmt" - - conn.execute("drop database if exists %s" % dbname) - conn.close() - - except Exception as err: - conn.execute("drop database if exists %s" % dbname) - conn.close() - raise err - -def test_stmt_insert_multi(conn): - # type: (TaosConnection) -> None - - dbname = "pytest_taos_stmt_multi" - try: - conn.execute("drop database if exists %s" % dbname) - conn.execute("create database if not exists %s" % dbname) - conn.select_db(dbname) - - conn.execute( - "create table if not exists log(ts timestamp, bo bool, nil tinyint, ti tinyint, si smallint, ii int,\ - bi bigint, tu tinyint unsigned, su smallint unsigned, iu int unsigned, bu bigint unsigned, \ - ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)", - ) - conn.load_table_info("log") - - start = datetime.now() - stmt = conn.statement("insert into log values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") - - params = new_multi_binds(16) - params[0].timestamp((1626861392589, 1626861392590, 1626861392591)) - params[1].bool((True, None, False)) - params[2].tinyint([-128, -128, None]) # -128 is tinyint null - params[3].tinyint([0, 127, None]) - params[4].smallint([3, None, 2]) - params[5].int([3, 4, None]) - params[6].bigint([3, 4, None]) - params[7].tinyint_unsigned([3, 4, None]) - params[8].smallint_unsigned([3, 4, None]) - params[9].int_unsigned([3, 4, None]) - params[10].bigint_unsigned([3, 4, None]) - params[11].float([3, None, 1]) - params[12].double([3, None, 1.2]) - params[13].binary(["abc", "dddafadfadfadfadfa", None]) - params[14].nchar(["涛思数据", None, "a long string with 中文字符"]) - params[15].timestamp([None, None, 1626861392591]) - stmt.bind_param_batch(params) - - stmt.execute() - end = datetime.now() - print("elapsed time: ", end - start) - result = stmt.use_result() - assert result.affected_rows == 3 - result.close() - stmt.close() - - stmt = conn.statement("select * from log") - stmt.execute() - result = stmt.use_result() - for row in result: - print(row) - result.close() - - stmt.close() - - # start = datetime.now() - # conn.query("insert into log values(1626861392660, true, NULL, 0, 3,3,3,3,3,3,3,3.0,3.0, 'abc','涛思数据',NULL)(1626861392661, true, NULL, 0, 3,3,3,3,3,3,3,3.0,3.0, 'abc','涛思数据',NULL)(1626861392662, true, NULL, 0, 3,3,3,3,3,3,3,3.0,3.0, 'abc','涛思数据',NULL)") - - # end = datetime.now() - # print("elapsed time: ", end - start) - - conn.execute("drop database if exists %s" % dbname) - conn.close() - - except Exception as err: - conn.execute("drop database if exists %s" % dbname) - conn.close() - raise err -if __name__ == "__main__": - test_stmt_insert(connect()) - test_stmt_insert_multi(connect()) \ No newline at end of file diff --git a/src/connector/python/tests/test_stream.py b/src/connector/python/tests/test_stream.py deleted file mode 100644 index 32ec4c5999c975be907cf69a42a04b5f4dd5d54c..0000000000000000000000000000000000000000 --- a/src/connector/python/tests/test_stream.py +++ /dev/null @@ -1,71 +0,0 @@ -from taos.cinterface import * -from taos.precision import * -from taos.bind import * -from taos import * -from ctypes import * -import time -import pytest - - -@pytest.fixture -def conn(): - return connect() - - -def stream_callback(p_param, p_result, p_row): - # type: (c_void_p, c_void_p, c_void_p) -> None - - if p_result == None or p_row == None: - return - result = TaosResult(p_result) - row = TaosRow(result, p_row) - try: - ts, count = row.as_tuple() - print(ts, count) - p = cast(p_param, POINTER(Counter)) - p.contents.count += count - print("[%s] inserted %d in 5s, total count: %d" % (ts.strftime("%Y-%m-%d %H:%M:%S"), count, p.contents.count)) - - except Exception as err: - print(err) - raise err - - -class Counter(ctypes.Structure): - _fields_ = [ - ("count", c_int), - ] - - def __str__(self): - return "%d" % self.count - - -def test_stream(conn): - # type: (TaosConnection) -> None - dbname = "pytest_taos_stream" - try: - conn.execute("drop database if exists %s" % dbname) - conn.execute("create database if not exists %s" % dbname) - conn.select_db(dbname) - conn.execute("create table if not exists log(ts timestamp, n int)") - - result = conn.query("select count(*) from log interval(5s)") - assert result.field_count == 2 - counter = Counter() - counter.count = 0 - stream = conn.stream("select count(*) from log interval(5s)", stream_callback, param=byref(counter)) - - for _ in range(0, 20): - conn.execute("insert into log values(now,0)(now+1s, 1)(now + 2s, 2)") - time.sleep(2) - stream.close() - conn.execute("drop database if exists %s" % dbname) - conn.close() - except Exception as err: - conn.execute("drop database if exists %s" % dbname) - conn.close() - raise err - - -if __name__ == "__main__": - test_stream(connect()) diff --git a/src/connector/python/tests/test_subscribe.py b/src/connector/python/tests/test_subscribe.py deleted file mode 100644 index d8acd60e4f3b32bb87a9663b3f7dc43a73f2877b..0000000000000000000000000000000000000000 --- a/src/connector/python/tests/test_subscribe.py +++ /dev/null @@ -1,100 +0,0 @@ -from taos.subscription import TaosSubscription -from taos import * -from ctypes import * -import taos -import pytest -import time -from random import random - - -@pytest.fixture -def conn(): - return taos.connect() - - -def test_subscribe(conn): - # type: (TaosConnection) -> None - - dbname = "pytest_taos_subscribe_callback" - try: - conn.execute("drop database if exists %s" % dbname) - conn.execute("create database if not exists %s" % dbname) - conn.select_db(dbname) - conn.execute("create table if not exists log(ts timestamp, n int)") - for i in range(10): - conn.execute("insert into log values(now, %d)" % i) - - sub = conn.subscribe(True, "test", "select * from log", 1000) - print("# consume from begin") - for ts, n in sub.consume(): - print(ts, n) - - print("# consume new data") - for i in range(5): - conn.execute("insert into log values(now, %d)(now+1s, %d)" % (i, i)) - result = sub.consume() - for ts, n in result: - print(ts, n) - - print("# consume with a stop condition") - for i in range(10): - conn.execute("insert into log values(now, %d)" % int(random() * 10)) - result = sub.consume() - try: - ts, n = next(result) - print(ts, n) - if n > 5: - result.stop_query() - print("## stopped") - break - except StopIteration: - continue - - sub.close() - - conn.execute("drop database if exists %s" % dbname) - conn.close() - except Exception as err: - conn.execute("drop database if exists %s" % dbname) - conn.close() - raise err - - -def subscribe_callback(p_sub, p_result, p_param, errno): - # type: (c_void_p, c_void_p, c_void_p, c_int) -> None - print("callback") - result = TaosResult(c_void_p(p_result)) - result.check_error(errno) - for row in result.rows_iter(): - ts, n = row() - print(ts, n) - - -def test_subscribe_callback(conn): - # type: (TaosConnection) -> None - dbname = "pytest_taos_subscribe_callback" - try: - conn.execute("drop database if exists %s" % dbname) - conn.execute("create database if not exists %s" % dbname) - conn.execute("use %s" % dbname) - conn.execute("create table if not exists log(ts timestamp, n int)") - - print("# subscribe with callback") - sub = conn.subscribe(False, "test", "select * from log", 1000, subscribe_callback) - - for i in range(10): - conn.execute("insert into log values(now, %d)" % i) - time.sleep(0.7) - sub.close() - - conn.execute("drop database if exists %s" % dbname) - conn.close() - except Exception as err: - conn.execute("drop database if exists %s" % dbname) - conn.close() - raise err - - -if __name__ == "__main__": - test_subscribe(taos.connect()) - test_subscribe_callback(taos.connect())