未验证 提交 04114474 编写于 作者: L Linhe Huo 提交者: GitHub

[TD-10770]<fix>: fix python connector tests and examples error in case of new...

[TD-10770]<fix>: fix python connector tests and examples error in case of new schemaless api (#8458)

* [TD-10770]<fix>: fix python connector tests and examples error in case of new schemaless api

* [TD-5892]<fix>: update python connector version to 2.1.1
上级 cc1a1b04
import taos
from taos import SmlProtocol, SmlPrecision
conn = taos.connect()
dbname = "pytest_line"
......@@ -9,10 +10,10 @@ conn.select_db(dbname)
lines = [
'st,t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"pass",c2=false,c4=4f64 1626006833639000000',
]
conn.schemaless_insert(lines, 0, "ns")
conn.schemaless_insert(lines, taos.SmlProtocol.LINE_PROTOCOL, taos.SmlPrecision.NOT_CONFIGURED)
print("inserted")
conn.schemaless_insert(lines, 0, "ns")
conn.schemaless_insert(lines, taos.SmlProtocol.LINE_PROTOCOL, taos.SmlPrecision.NOT_CONFIGURED)
result = conn.query("show tables")
for row in result:
......
[tool.poetry]
name = "taos"
version = "2.1.0"
version = "2.1.1"
description = "TDengine connector for python"
authors = ["Taosdata Inc. <support@taosdata.com>"]
license = "AGPL-3.0"
......
......@@ -5,7 +5,7 @@ with open("README.md", "r") as fh:
setuptools.setup(
name="taos",
version="2.1.0",
version="2.1.1",
author="Taosdata Inc.",
author_email="support@taosdata.com",
description="TDengine python client package",
......
......@@ -440,6 +440,7 @@ from .cursor import *
from .result import *
from .statement import *
from .subscription import *
from .schemaless import *
try:
import importlib.metadata
......@@ -468,6 +469,8 @@ __all__ = [
"TaosRow",
"TaosStmt",
"PrecisionEnum",
"SmlPrecision",
"SmlProtocol"
]
def connect(*args, **kwargs):
......
......@@ -12,6 +12,7 @@ except:
from .error import *
from .bind import *
from .field import *
from .schemaless import *
# stream callback
......@@ -810,27 +811,27 @@ def taos_stmt_use_result(stmt):
return result
try:
_libtaos.taos_insert_lines.restype = c_int
_libtaos.taos_insert_lines.argstype = c_void_p, c_void_p, c_int
_libtaos.taos_schemaless_insert.restype = c_void_p
_libtaos.taos_schemaless_insert.argstype = c_void_p, c_void_p, c_int, c_int, c_int
except AttributeError:
print("WARNING: libtaos(%s) does not support insert_lines" % taos_get_client_info())
print("WARNING: libtaos(%s) does not support taos_schemaless_insert" % taos_get_client_info())
def taos_schemaless_insert(connection, lines, protocol, precision):
# type: (c_void_p, list[str] | tuple(str)) -> None
# type: (c_void_p, list[str] | tuple(str), SmlProtocol, SmlPrecision) -> int
num_of_lines = len(lines)
lines = (c_char_p(line.encode("utf-8")) for line in lines)
lines_type = ctypes.c_char_p * num_of_lines
p_lines = lines_type(*lines)
res = c_void_p(_libtaos.taos_schemaless_insert(connection, p_lines, num_of_lines, protocol, precision))
errno = taos_errno(res)
affected_rows = taos_affected_rows(res)
if errno != 0:
errstr = taos_errstr(res)
taos_free_result(res)
print("schemaless_insert error affected rows: {}".format(taos_affected_rows(res)))
raise SchemalessError(errstr, errno)
raise SchemalessError(errstr, errno, affected_rows)
taos_free_result(res)
return errno
return affected_rows
class CTaosInterface(object):
def __init__(self, config=None):
......
......@@ -72,10 +72,9 @@ class TaosConnection(object):
taos_select_db(self._conn, database)
def execute(self, sql):
# type: (str) -> None
# type: (str) -> int
"""Simplely execute sql ignoring the results"""
res = taos_query(self._conn, sql)
taos_free_result(res)
return self.query(sql).affected_rows
def query(self, sql):
# type: (str) -> TaosResult
......@@ -118,7 +117,7 @@ class TaosConnection(object):
return TaosStream(stream)
def schemaless_insert(self, lines, protocol, precision):
# type: (list[str]) -> None
# type: (list[str], SmlProtocol, SmlPrecision) -> int
"""
1.Line protocol and schemaless support
......@@ -171,6 +170,7 @@ class TaosConnection(object):
conn.schemaless_insert(lines, 2, None)
"""
print(lines, protocol, precision)
return taos_schemaless_insert(self._conn, lines, protocol, precision)
......
......@@ -83,7 +83,16 @@ class ResultError(DatabaseError):
class SchemalessError(DatabaseError):
"""taos_schemaless_insert errors."""
pass
def __init__(self, msg=None, errno=0xffff, affected_rows=0):
DatabaseError.__init__(self, msg, errno)
self.affected_rows = affected_rows
def __str__(self):
return self._full_msg + "(affected rows: %d)" % self.affected_rows
# @property
# def affected_rows(self):
# return self.affected_rows
class StatementError(DatabaseError):
......
......@@ -123,6 +123,12 @@ class TaosResult(object):
for i in range(len(self._fields)):
buffer[i].extend(block[i])
return list(map(tuple, zip(*buffer)))
def fetch_all_into_dict(self):
"""Fetch all rows and convert it to dict"""
names = [field.name for field in self.fields]
rows = self.fetch_all()
return list(dict(zip(names, row)) for row in rows)
def fetch_rows_a(self, callback, param):
taos_fetch_rows_a(self._result, callback, param)
......@@ -228,6 +234,12 @@ class TaosRow:
blocks[i] = CONVERT_FUNC[fields[i].type](data, 1, field_lens[i], precision)[0]
return tuple(blocks)
def as_dict(self):
values = self.as_tuple()
names = self._result.fields
dict(zip(names, values))
class TaosBlocks:
"""TDengine result blocks iterator"""
......
class SmlPrecision:
"""Schemaless timestamp precision constants"""
NOT_CONFIGURED = 0 # C.TSDB_SML_TIMESTAMP_NOT_CONFIGURED
HOURS = 1
MINUTES = 2
SECONDS = 3
MILLI_SECONDS = 4
MICRO_SECONDS = 5
NANO_SECONDS = 6
class SmlProtocol:
"""Schemaless protocol constants"""
UNKNOWN_PROTOCOL = 0
LINE_PROTOCOL = 1
TELNET_PROTOCOL = 2
JSON_PROTOCOL = 3
\ No newline at end of file
from taos.error import OperationalError
from taos.error import OperationalError, SchemalessError
from taos import connect, new_bind_params, PrecisionEnum
from taos import *
......@@ -13,32 +13,95 @@ def conn():
return connect()
def test_schemaless_insert_update_2(conn):
# type: (TaosConnection) -> None
dbname = "test_schemaless_insert_update_2"
try:
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s update 2 precision 'ns'" % dbname)
conn.select_db(dbname)
lines = [
'st,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin, abc",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000',
]
res = conn.schemaless_insert(lines, 1, 0)
print("affected rows: ", res)
assert(res == 1)
result = conn.query("select * from st")
[before] = result.fetch_all_into_dict()
assert(before["c3"] == "passitagin, abc")
lines = [
'st,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000',
]
res = conn.schemaless_insert(lines, 1, 0)
result = conn.query("select * from st")
[after] = result.fetch_all_into_dict()
assert(after["c3"] == "passitagin")
conn.execute("drop database if exists %s" % dbname)
conn.close()
except Exception as err:
conn.execute("drop database if exists %s" % dbname)
conn.close()
print(err)
raise err
def test_schemaless_insert(conn):
# type: (TaosConnection) -> None
dbname = "pytest_taos_schemaless_insert"
try:
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s precision 'us'" % dbname)
conn.execute("create database if not exists %s update 2 precision 'ns'" % dbname)
conn.select_db(dbname)
lines = [
'st,t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"passit",c2=false,c4=4f64 1626006833639000000',
'st,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000',
'st,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin, abc",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000',
'stf,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000',
]
conn.schemaless_insert(lines, 0, "ns")
res = conn.schemaless_insert(lines, 1, 0)
print("affected rows: ", res)
assert(res == 3)
lines = [
'stf,t1=5i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000',
]
conn.schemaless_insert(lines, 0, "ns")
res = conn.schemaless_insert(lines, 1, 0)
print("affected rows: ", res)
assert(res == 1)
result = conn.query("select * from st")
dict2 = result.fetch_all_into_dict()
print(dict2)
result.row_count
all = result.rows_iter()
for row in all:
print(row)
result.close()
assert(result.row_count == 2)
# error test
lines = [
',t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"passit",c2=false,c4=4f64 1626006833639000000',
]
try:
res = conn.schemaless_insert(lines, 1, 0)
print(res)
# assert(False)
except SchemalessError as err:
pass
result = conn.query("select * from st")
result.row_count
all = result.rows_iter()
for row in all:
print(row)
result.close()
print(result.row_count)
conn.execute("drop database if exists %s" % dbname)
conn.close()
......@@ -52,3 +115,4 @@ def test_schemaless_insert(conn):
if __name__ == "__main__":
test_schemaless_insert(connect())
test_schemaless_insert_update_2(connect())
# encoding:UTF-8
from taos import *
from ctypes import *
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册