提交 da32a257 编写于 作者: sangshuduo's avatar sangshuduo

[TD-2971] feature: make python connector support unsigned type. fix constants id.

上级 ebc60c02
......@@ -19,10 +19,10 @@ class FieldType(object):
C_BINARY = 8
C_TIMESTAMP = 9
C_NCHAR = 10
C_TINYINT_UNSIGNED = 12
C_SMALLINT_UNSIGNED = 13
C_INT_UNSIGNED = 14
C_BIGINT_UNSIGNED = 15
C_TINYINT_UNSIGNED = 11
C_SMALLINT_UNSIGNED = 12
C_INT_UNSIGNED = 13
C_BIGINT_UNSIGNED = 14
# NULL value definition
# NOTE: These values should change according to C definition in tsdb.h
C_BOOL_NULL = 0x02
......
......@@ -523,6 +523,7 @@ class CTaosInterface(object):
for i in range(len(fields)):
data = ctypes.cast(pblock, ctypes.POINTER(ctypes.c_void_p))[i]
if fields[i]['type'] not in _CONVERT_FUNC_BLOCK:
print("CBD cinterface.py LN526")
raise DatabaseError("Invalid data type returned from database")
blocks[i] = _CONVERT_FUNC_BLOCK[fields[i]['type']](
data, num_of_rows, fieldLen[i], isMicro)
......@@ -547,6 +548,7 @@ class CTaosInterface(object):
for i in range(len(fields)):
data = ctypes.cast(pblock, ctypes.POINTER(ctypes.c_void_p))[i]
if fields[i]['type'] not in _CONVERT_FUNC:
print("CBD cinterface.py LN551")
raise DatabaseError(
"Invalid data type returned from database")
if data is None:
......
......@@ -19,10 +19,10 @@ class FieldType(object):
C_BINARY = 8
C_TIMESTAMP = 9
C_NCHAR = 10
C_TINYINT_UNSIGNED = 12
C_SMALLINT_UNSIGNED = 13
C_INT_UNSIGNED = 14
C_BIGINT_UNSIGNED = 15
C_TINYINT_UNSIGNED = 11
C_SMALLINT_UNSIGNED = 12
C_INT_UNSIGNED = 13
C_BIGINT_UNSIGNED = 14
# NULL value definition
# NOTE: These values should change according to C definition in tsdb.h
C_BOOL_NULL = 0x02
......
......@@ -19,10 +19,10 @@ class FieldType(object):
C_BINARY = 8
C_TIMESTAMP = 9
C_NCHAR = 10
C_TINYINT_UNSIGNED = 12
C_SMALLINT_UNSIGNED = 13
C_INT_UNSIGNED = 14
C_BIGINT_UNSIGNED = 15
C_TINYINT_UNSIGNED = 11
C_SMALLINT_UNSIGNED = 12
C_INT_UNSIGNED = 13
C_BIGINT_UNSIGNED = 14
# NULL value definition
# NOTE: These values should change according to C definition in tsdb.h
C_BOOL_NULL = 0x02
......
......@@ -8,6 +8,7 @@ paramstyle = 'pyformat'
__all__ = ['connection', 'cursor']
def connect(*args, **kwargs):
""" Function to return a TDengine connector object
......
......@@ -4,11 +4,14 @@ from .error import *
import math
import datetime
def _convert_millisecond_to_datetime(milli):
return datetime.datetime.fromtimestamp(milli/1000.0)
return datetime.datetime.fromtimestamp(milli / 1000.0)
def _convert_microsecond_to_datetime(micro):
return datetime.datetime.fromtimestamp(micro/1000000.0)
return datetime.datetime.fromtimestamp(micro / 1000000.0)
def _crow_timestamp_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C bool row to python row
......@@ -18,168 +21,224 @@ def _crow_timestamp_to_python(data, num_of_rows, nbytes=None, micro=False):
_timestamp_converter = _convert_microsecond_to_datetime
if num_of_rows > 0:
return list(map(_timestamp_converter, ctypes.cast(data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)]))
return list(map(_timestamp_converter, ctypes.cast(
data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)]))
else:
return list(map(_timestamp_converter, ctypes.cast(data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)]))
return list(map(_timestamp_converter, ctypes.cast(
data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)]))
def _crow_bool_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C bool row to python row
"""
if num_of_rows > 0:
return [ None if ele == FieldType.C_BOOL_NULL else bool(ele) for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)] ]
return [
None if ele == FieldType.C_BOOL_NULL else bool(ele) for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_byte))[
:abs(num_of_rows)]]
else:
return [ None if ele == FieldType.C_BOOL_NULL else bool(ele) for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_bool))[:abs(num_of_rows)] ]
return [
None if ele == FieldType.C_BOOL_NULL else bool(ele) for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_bool))[
:abs(num_of_rows)]]
def _crow_tinyint_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C tinyint row to python row
"""
if num_of_rows > 0:
return [ None if ele == FieldType.C_TINYINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)] ]
return [None if ele == FieldType.C_TINYINT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)]]
else:
return [ None if ele == FieldType.C_TINYINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)] ]
return [None if ele == FieldType.C_TINYINT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)]]
def _crow_smallint_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C smallint row to python row
"""
if num_of_rows > 0:
return [ None if ele == FieldType.C_SMALLINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_short))[:abs(num_of_rows)]]
return [
None if ele == FieldType.C_SMALLINT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_short))[
:abs(num_of_rows)]]
else:
return [ None if ele == FieldType.C_SMALLINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_short))[:abs(num_of_rows)] ]
return [
None if ele == FieldType.C_SMALLINT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_short))[
:abs(num_of_rows)]]
def _crow_int_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C int row to python row
"""
if num_of_rows > 0:
return [ None if ele == FieldType.C_INT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_int))[:abs(num_of_rows)] ]
return [None if ele == FieldType.C_INT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_int))[:abs(num_of_rows)]]
else:
return [ None if ele == FieldType.C_INT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_int))[:abs(num_of_rows)] ]
return [None if ele == FieldType.C_INT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_int))[:abs(num_of_rows)]]
def _crow_bigint_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C bigint row to python row
"""
if num_of_rows > 0:
return [ None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)] ]
return [None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)]]
else:
return [ None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)] ]
return [None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)]]
def _crow_float_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C float row to python row
"""
if num_of_rows > 0:
return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_float))[:abs(num_of_rows)] ]
return [None if math.isnan(ele) else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_float))[:abs(num_of_rows)]]
else:
return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_float))[:abs(num_of_rows)] ]
return [None if math.isnan(ele) else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_float))[:abs(num_of_rows)]]
def _crow_double_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C double row to python row
"""
if num_of_rows > 0:
return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_double))[:abs(num_of_rows)] ]
return [None if math.isnan(ele) else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_double))[:abs(num_of_rows)]]
else:
return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_double))[:abs(num_of_rows)] ]
return [None if math.isnan(ele) else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_double))[:abs(num_of_rows)]]
def _crow_binary_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C binary row to python row
"""
assert(nbytes is not None)
if num_of_rows > 0:
return [ None if ele.value[0:1] == FieldType.C_BINARY_NULL else ele.value.decode('utf-8') for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_char * nbytes)))[:abs(num_of_rows)]]
return [None if ele.value[0:1] == FieldType.C_BINARY_NULL else ele.value.decode(
'utf-8') for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_char * nbytes)))[:abs(num_of_rows)]]
else:
return [ None if ele.value[0:1] == FieldType.C_BINARY_NULL else ele.value.decode('utf-8') for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_char * nbytes)))[:abs(num_of_rows)]]
return [None if ele.value[0:1] == FieldType.C_BINARY_NULL else ele.value.decode(
'utf-8') for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_char * nbytes)))[:abs(num_of_rows)]]
def _crow_nchar_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C nchar row to python row
"""
assert(nbytes is not None)
res=[]
res = []
for i in range(abs(num_of_rows)):
try:
if num_of_rows >= 0:
tmpstr = ctypes.c_char_p(data)
res.append( tmpstr.value.decode() )
res.append(tmpstr.value.decode())
else:
res.append( (ctypes.cast(data+nbytes*i, ctypes.POINTER(ctypes.c_wchar * (nbytes//4))))[0].value )
res.append((ctypes.cast(data + nbytes * i,
ctypes.POINTER(ctypes.c_wchar * (nbytes // 4))))[0].value)
except ValueError:
res.append(None)
return res
def _crow_binary_to_python_block(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C binary row to python row
"""
assert(nbytes is not None)
res=[]
res = []
if num_of_rows > 0:
for i in range(abs(num_of_rows)):
try:
rbyte=ctypes.cast(data+nbytes*i,ctypes.POINTER(ctypes.c_short))[:1].pop()
tmpstr = ctypes.c_char_p(data+nbytes*i+2)
res.append( tmpstr.value.decode()[0:rbyte] )
rbyte = ctypes.cast(
data + nbytes * i,
ctypes.POINTER(
ctypes.c_short))[
:1].pop()
tmpstr = ctypes.c_char_p(data + nbytes * i + 2)
res.append(tmpstr.value.decode()[0:rbyte])
except ValueError:
res.append(None)
else:
for i in range(abs(num_of_rows)):
try:
rbyte=ctypes.cast(data+nbytes*i,ctypes.POINTER(ctypes.c_short))[:1].pop()
tmpstr = ctypes.c_char_p(data+nbytes*i+2)
res.append( tmpstr.value.decode()[0:rbyte] )
rbyte = ctypes.cast(
data + nbytes * i,
ctypes.POINTER(
ctypes.c_short))[
:1].pop()
tmpstr = ctypes.c_char_p(data + nbytes * i + 2)
res.append(tmpstr.value.decode()[0:rbyte])
except ValueError:
res.append(None)
return res
def _crow_nchar_to_python_block(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C nchar row to python row
"""
assert(nbytes is not None)
res=[]
res = []
if num_of_rows >= 0:
for i in range(abs(num_of_rows)):
try:
tmpstr = ctypes.c_char_p(data+nbytes*i+2)
res.append( tmpstr.value.decode() )
tmpstr = ctypes.c_char_p(data + nbytes * i + 2)
res.append(tmpstr.value.decode())
except ValueError:
res.append(None)
else:
for i in range(abs(num_of_rows)):
try:
res.append( (ctypes.cast(data+nbytes*i+2, ctypes.POINTER(ctypes.c_wchar * (nbytes//4))))[0].value )
res.append((ctypes.cast(data + nbytes * i + 2,
ctypes.POINTER(ctypes.c_wchar * (nbytes // 4))))[0].value)
except ValueError:
res.append(None)
return res
_CONVERT_FUNC = {
FieldType.C_BOOL: _crow_bool_to_python,
FieldType.C_TINYINT : _crow_tinyint_to_python,
FieldType.C_SMALLINT : _crow_smallint_to_python,
FieldType.C_INT : _crow_int_to_python,
FieldType.C_BIGINT : _crow_bigint_to_python,
FieldType.C_FLOAT : _crow_float_to_python,
FieldType.C_DOUBLE : _crow_double_to_python,
FieldType.C_TINYINT: _crow_tinyint_to_python,
FieldType.C_SMALLINT: _crow_smallint_to_python,
FieldType.C_INT: _crow_int_to_python,
FieldType.C_BIGINT: _crow_bigint_to_python,
FieldType.C_FLOAT: _crow_float_to_python,
FieldType.C_DOUBLE: _crow_double_to_python,
FieldType.C_BINARY: _crow_binary_to_python,
FieldType.C_TIMESTAMP : _crow_timestamp_to_python,
FieldType.C_NCHAR : _crow_nchar_to_python
FieldType.C_TIMESTAMP: _crow_timestamp_to_python,
FieldType.C_NCHAR: _crow_nchar_to_python
}
_CONVERT_FUNC_BLOCK = {
FieldType.C_BOOL: _crow_bool_to_python,
FieldType.C_TINYINT : _crow_tinyint_to_python,
FieldType.C_SMALLINT : _crow_smallint_to_python,
FieldType.C_INT : _crow_int_to_python,
FieldType.C_BIGINT : _crow_bigint_to_python,
FieldType.C_FLOAT : _crow_float_to_python,
FieldType.C_DOUBLE : _crow_double_to_python,
FieldType.C_TINYINT: _crow_tinyint_to_python,
FieldType.C_SMALLINT: _crow_smallint_to_python,
FieldType.C_INT: _crow_int_to_python,
FieldType.C_BIGINT: _crow_bigint_to_python,
FieldType.C_FLOAT: _crow_float_to_python,
FieldType.C_DOUBLE: _crow_double_to_python,
FieldType.C_BINARY: _crow_binary_to_python_block,
FieldType.C_TIMESTAMP : _crow_timestamp_to_python,
FieldType.C_NCHAR : _crow_nchar_to_python_block
FieldType.C_TIMESTAMP: _crow_timestamp_to_python,
FieldType.C_NCHAR: _crow_nchar_to_python_block
}
# Corresponding TAOS_FIELD structure in C
class TaosField(ctypes.Structure):
_fields_ = [('name', ctypes.c_char * 65),
('type', ctypes.c_char),
('bytes', ctypes.c_short)]
# C interface class
class CTaosInterface(object):
libtaos = ctypes.windll.LoadLibrary('taos')
......@@ -216,7 +275,7 @@ class CTaosInterface(object):
except AttributeError:
raise AttributeError("config is expected as a str")
if config != None:
if config is not None:
CTaosInterface.libtaos.taos_options(3, self._config)
CTaosInterface.libtaos.taos_init()
......@@ -227,7 +286,13 @@ class CTaosInterface(object):
"""
return self._config
def connect(self, host=None, user="root", password="taosdata", db=None, port=0):
def connect(
self,
host=None,
user="root",
password="taosdata",
db=None,
port=0):
'''
Function to connect to server
......@@ -236,7 +301,7 @@ class CTaosInterface(object):
# host
try:
_host = ctypes.c_char_p(host.encode(
"utf-8")) if host != None else ctypes.c_char_p(None)
"utf-8")) if host is not None else ctypes.c_char_p(None)
except AttributeError:
raise AttributeError("host is expected as a str")
......@@ -255,7 +320,7 @@ class CTaosInterface(object):
# db
try:
_db = ctypes.c_char_p(
db.encode("utf-8")) if db != None else ctypes.c_char_p(None)
db.encode("utf-8")) if db is not None else ctypes.c_char_p(None)
except AttributeError:
raise AttributeError("db is expected as a str")
......@@ -268,11 +333,11 @@ class CTaosInterface(object):
connection = ctypes.c_void_p(CTaosInterface.libtaos.taos_connect(
_host, _user, _password, _db, _port))
if connection.value == None:
if connection.value is None:
print('connect to TDengine failed')
raise ConnectionError("connect to TDengine failed")
# sys.exit(1)
#else:
# else:
# print('connect to TDengine success')
return connection
......@@ -293,7 +358,8 @@ class CTaosInterface(object):
@rtype: 0 on success and -1 on failure
'''
try:
return CTaosInterface.libtaos.taos_query(connection, ctypes.c_char_p(sql.encode('utf-8')))
return CTaosInterface.libtaos.taos_query(
connection, ctypes.c_char_p(sql.encode('utf-8')))
except AttributeError:
raise AttributeError("sql is expected as a string")
# finally:
......@@ -360,35 +426,49 @@ class CTaosInterface(object):
result, ctypes.byref(pblock))
if num_of_rows == 0:
return None, 0
isMicro = (CTaosInterface.libtaos.taos_result_precision(result) == FieldType.C_TIMESTAMP_MICRO)
isMicro = (CTaosInterface.libtaos.taos_result_precision(
result) == FieldType.C_TIMESTAMP_MICRO)
blocks = [None] * len(fields)
fieldL = CTaosInterface.libtaos.taos_fetch_lengths(result)
fieldLen = [ele for ele in ctypes.cast(fieldL, ctypes.POINTER(ctypes.c_int))[:len(fields)]]
fieldLen = [
ele for ele in ctypes.cast(
fieldL, ctypes.POINTER(
ctypes.c_int))[
:len(fields)]]
for i in range(len(fields)):
data = ctypes.cast(pblock, ctypes.POINTER(ctypes.c_void_p))[i]
if fields[i]['type'] not in _CONVERT_FUNC_BLOCK:
raise DatabaseError("Invalid data type returned from database")
blocks[i] = _CONVERT_FUNC_BLOCK[fields[i]['type']](data, num_of_rows, fieldLen[i], isMicro)
blocks[i] = _CONVERT_FUNC_BLOCK[fields[i]['type']](
data, num_of_rows, fieldLen[i], isMicro)
return blocks, abs(num_of_rows)
@staticmethod
def fetchRow(result, fields):
pblock = ctypes.c_void_p(0)
pblock = CTaosInterface.libtaos.taos_fetch_row(result)
if pblock :
if pblock:
num_of_rows = 1
isMicro = (CTaosInterface.libtaos.taos_result_precision(result) == FieldType.C_TIMESTAMP_MICRO)
isMicro = (CTaosInterface.libtaos.taos_result_precision(
result) == FieldType.C_TIMESTAMP_MICRO)
blocks = [None] * len(fields)
fieldL = CTaosInterface.libtaos.taos_fetch_lengths(result)
fieldLen = [ele for ele in ctypes.cast(fieldL, ctypes.POINTER(ctypes.c_int))[:len(fields)]]
fieldLen = [
ele for ele in ctypes.cast(
fieldL, ctypes.POINTER(
ctypes.c_int))[
:len(fields)]]
for i in range(len(fields)):
data = ctypes.cast(pblock, ctypes.POINTER(ctypes.c_void_p))[i]
if fields[i]['type'] not in _CONVERT_FUNC:
raise DatabaseError("Invalid data type returned from database")
raise DatabaseError(
"Invalid data type returned from database")
if data is None:
blocks[i] = [None]
else:
blocks[i] = _CONVERT_FUNC[fields[i]['type']](data, num_of_rows, fieldLen[i], isMicro)
blocks[i] = _CONVERT_FUNC[fields[i]['type']](
data, num_of_rows, fieldLen[i], isMicro)
else:
return None, 0
return blocks, abs(num_of_rows)
......
......@@ -2,9 +2,11 @@ from .cursor import TDengineCursor
from .subscription import TDengineSubscription
from .cinterface import CTaosInterface
class TDengineConnection(object):
""" TDengine connection object
"""
def __init__(self, *args, **kwargs):
self._conn = None
self._host = None
......@@ -44,7 +46,12 @@ class TDengineConnection(object):
self._config = kwargs['config']
self._chandle = CTaosInterface(self._config)
self._conn = self._chandle.connect(self._host, self._user, self._password, self._database, self._port)
self._conn = self._chandle.connect(
self._host,
self._user,
self._password,
self._database,
self._port)
def close(self):
"""Close current connection.
......@@ -56,7 +63,8 @@ class TDengineConnection(object):
"""
if self._conn is None:
return None
sub = CTaosInterface.subscribe(self._conn, restart, topic, sql, interval)
sub = CTaosInterface.subscribe(
self._conn, restart, topic, sql, interval)
return TDengineSubscription(sub)
def cursor(self):
......@@ -81,6 +89,7 @@ class TDengineConnection(object):
"""
pass
if __name__ == "__main__":
conn = TDengineConnection(host='192.168.1.107')
conn.close()
......
......@@ -3,6 +3,7 @@
from .dbapi import *
class FieldType(object):
"""TDengine Field Types
"""
......@@ -18,13 +19,21 @@ class FieldType(object):
C_BINARY = 8
C_TIMESTAMP = 9
C_NCHAR = 10
C_TINYINT_UNSIGNED = 11
C_SMALLINT_UNSIGNED = 12
C_INT_UNSIGNED = 13
C_BIGINT_UNSIGNED = 14
# NULL value definition
# NOTE: These values should change according to C definition in tsdb.h
C_BOOL_NULL = 0x02
C_TINYINT_NULL = -128
C_TINYINT_UNSIGNED_NULL = 255
C_SMALLINT_NULL = -32768
C_SMALLINT_UNSIGNED_NULL = 65535
C_INT_NULL = -2147483648
C_INT_UNSIGNED_NULL = 4294967295
C_BIGINT_NULL = -9223372036854775808
C_BIGINT_UNSIGNED_NULL = 18446744073709551615
C_FLOAT_NULL = float('nan')
C_DOUBLE_NULL = float('nan')
C_BINARY_NULL = bytearray([int('0xff', 16)])
......
......@@ -5,6 +5,7 @@ import threading
# querySeqNum = 0
class TDengineCursor(object):
"""Database cursor which is used to manage the context of a fetch operation.
......@@ -50,7 +51,8 @@ class TDengineCursor(object):
raise OperationalError("Invalid use of fetch iterator")
if self._block_rows <= self._block_iter:
block, self._block_rows = CTaosInterface.fetchRow(self._result, self._fields)
block, self._block_rows = CTaosInterface.fetchRow(
self._result, self._fields)
if self._block_rows == 0:
raise StopIteration
self._block = list(map(tuple, zip(*block)))
......@@ -117,8 +119,9 @@ class TDengineCursor(object):
errno = CTaosInterface.libtaos.taos_errno(self._result)
if errno == 0:
if CTaosInterface.fieldsCount(self._result) == 0:
self._affected_rows += CTaosInterface.affectedRows(self._result)
return CTaosInterface.affectedRows(self._result )
self._affected_rows += CTaosInterface.affectedRows(
self._result)
return CTaosInterface.affectedRows(self._result)
else:
self._fields = CTaosInterface.useResult(self._result)
return self._handle_result()
......@@ -147,10 +150,13 @@ class TDengineCursor(object):
buffer = [[] for i in range(len(self._fields))]
self._rowcount = 0
while True:
block, num_of_fields = CTaosInterface.fetchRow(self._result, self._fields)
block, num_of_fields = CTaosInterface.fetchRow(
self._result, self._fields)
errno = CTaosInterface.libtaos.taos_errno(self._result)
if errno != 0:
raise ProgrammingError(CTaosInterface.errStr(self._result), errno)
raise ProgrammingError(
CTaosInterface.errStr(
self._result), errno)
if num_of_fields == 0:
break
self._rowcount += num_of_fields
......@@ -165,20 +171,21 @@ class TDengineCursor(object):
buffer = [[] for i in range(len(self._fields))]
self._rowcount = 0
while True:
block, num_of_fields = CTaosInterface.fetchBlock(self._result, self._fields)
block, num_of_fields = CTaosInterface.fetchBlock(
self._result, self._fields)
errno = CTaosInterface.libtaos.taos_errno(self._result)
if errno != 0:
raise ProgrammingError(CTaosInterface.errStr(self._result), errno)
if num_of_fields == 0: break
raise ProgrammingError(
CTaosInterface.errStr(
self._result), errno)
if num_of_fields == 0:
break
self._rowcount += num_of_fields
for i in range(len(self._fields)):
buffer[i].extend(block[i])
return list(map(tuple, zip(*buffer)))
def nextset(self):
"""
"""
......@@ -209,6 +216,7 @@ class TDengineCursor(object):
"""
self._description = []
for ele in self._fields:
self._description.append((ele['name'], ele['type'], None, None, None, None, False))
self._description.append(
(ele['name'], ele['type'], None, None, None, None, False))
return self._result
......@@ -4,6 +4,7 @@
import time
import datetime
class DBAPITypeObject(object):
def __init__(self, *values):
self.values = values
......@@ -16,19 +17,24 @@ class DBAPITypeObject(object):
else:
return -1
Date = datetime.date
Time = datetime.time
Timestamp = datetime.datetime
def DataFromTicks(ticks):
return Date(*time.localtime(ticks)[:3])
def TimeFromTicks(ticks):
return Time(*time.localtime(ticks)[3:6])
def TimestampFromTicks(ticks):
return Timestamp(*time.localtime(ticks)[:6])
Binary = bytes
# STRING = DBAPITypeObject(*constants.FieldType.get_string_types())
......
"""Python exceptions
"""
class Error(Exception):
def __init__(self, msg=None, errno=None):
self.msg = msg
......@@ -10,26 +11,31 @@ class Error(Exception):
def __str__(self):
return self._full_msg
class Warning(Exception):
"""Exception raised for important warnings like data truncations while inserting.
"""
pass
class InterfaceError(Error):
"""Exception raised for errors that are related to the database interface rather than the database itself.
"""
pass
class DatabaseError(Error):
"""Exception raised for errors that are related to the database.
"""
pass
class DataError(DatabaseError):
"""Exception raised for errors that are due to problems with the processed data like division by zero, numeric value out of range.
"""
pass
class OperationalError(DatabaseError):
"""Exception raised for errors that are related to the database's operation and not necessarily under the control of the programmer
"""
......@@ -41,16 +47,19 @@ class IntegrityError(DatabaseError):
"""
pass
class InternalError(DatabaseError):
"""Exception raised when the database encounters an internal error.
"""
pass
class ProgrammingError(DatabaseError):
"""Exception raised for programming errors.
"""
pass
class NotSupportedError(DatabaseError):
"""Exception raised in case a method or database API was used which is not supported by the database,.
"""
......
from .cinterface import CTaosInterface
from .error import *
class TDengineSubscription(object):
"""TDengine subscription object
"""
def __init__(self, sub):
self._sub = sub
def consume(self):
"""Consume rows of a subscription
"""
......@@ -18,15 +19,15 @@ class TDengineSubscription(object):
buffer = [[] for i in range(len(fields))]
while True:
block, num_of_fields = CTaosInterface.fetchBlock(result, fields)
if num_of_fields == 0: break
if num_of_fields == 0:
break
for i in range(len(fields)):
buffer[i].extend(block[i])
self.fields = fields
return list(map(tuple, zip(*buffer)))
def close(self, keepProgress = True):
def close(self, keepProgress=True):
"""Close the Subscription.
"""
if self._sub is None:
......@@ -38,12 +39,16 @@ class TDengineSubscription(object):
if __name__ == '__main__':
from .connection import TDengineConnection
conn = TDengineConnection(host="127.0.0.1", user="root", password="taosdata", database="test")
conn = TDengineConnection(
host="127.0.0.1",
user="root",
password="taosdata",
database="test")
# Generate a cursor object to run SQL commands
sub = conn.subscribe(True, "test", "select * from meters;", 1000)
for i in range(0,10):
for i in range(0, 10):
data = sub.consume()
for d in data:
print(d)
......
......@@ -8,6 +8,7 @@ paramstyle = 'pyformat'
__all__ = ['connection', 'cursor']
def connect(*args, **kwargs):
""" Function to return a TDengine connector object
......
......@@ -4,11 +4,14 @@ from .error import *
import math
import datetime
def _convert_millisecond_to_datetime(milli):
return datetime.datetime.fromtimestamp(milli/1000.0)
return datetime.datetime.fromtimestamp(milli / 1000.0)
def _convert_microsecond_to_datetime(micro):
return datetime.datetime.fromtimestamp(micro/1000000.0)
return datetime.datetime.fromtimestamp(micro / 1000000.0)
def _crow_timestamp_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C bool row to python row
......@@ -18,74 +21,113 @@ def _crow_timestamp_to_python(data, num_of_rows, nbytes=None, micro=False):
_timestamp_converter = _convert_microsecond_to_datetime
if num_of_rows > 0:
return list(map(_timestamp_converter, ctypes.cast(data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)]))
return list(map(_timestamp_converter, ctypes.cast(
data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)]))
else:
return list(map(_timestamp_converter, ctypes.cast(data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)]))
return list(map(_timestamp_converter, ctypes.cast(
data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)]))
def _crow_bool_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C bool row to python row
"""
if num_of_rows > 0:
return [ None if ele == FieldType.C_BOOL_NULL else bool(ele) for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)] ]
return [
None if ele == FieldType.C_BOOL_NULL else bool(ele) for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_byte))[
:abs(num_of_rows)]]
else:
return [ None if ele == FieldType.C_BOOL_NULL else bool(ele) for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_bool))[:abs(num_of_rows)] ]
return [
None if ele == FieldType.C_BOOL_NULL else bool(ele) for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_bool))[
:abs(num_of_rows)]]
def _crow_tinyint_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C tinyint row to python row
"""
if num_of_rows > 0:
return [ None if ele == FieldType.C_TINYINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)] ]
return [None if ele == FieldType.C_TINYINT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)]]
else:
return [ None if ele == FieldType.C_TINYINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)] ]
return [None if ele == FieldType.C_TINYINT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)]]
def _crow_smallint_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C smallint row to python row
"""
if num_of_rows > 0:
return [ None if ele == FieldType.C_SMALLINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_short))[:abs(num_of_rows)]]
return [
None if ele == FieldType.C_SMALLINT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_short))[
:abs(num_of_rows)]]
else:
return [ None if ele == FieldType.C_SMALLINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_short))[:abs(num_of_rows)] ]
return [
None if ele == FieldType.C_SMALLINT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_short))[
:abs(num_of_rows)]]
def _crow_int_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C int row to python row
"""
if num_of_rows > 0:
return [ None if ele == FieldType.C_INT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_int))[:abs(num_of_rows)] ]
return [None if ele == FieldType.C_INT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_int))[:abs(num_of_rows)]]
else:
return [ None if ele == FieldType.C_INT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_int))[:abs(num_of_rows)] ]
return [None if ele == FieldType.C_INT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_int))[:abs(num_of_rows)]]
def _crow_bigint_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C bigint row to python row
"""
if num_of_rows > 0:
return [ None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)] ]
return [None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)]]
else:
return [ None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)] ]
return [None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)]]
def _crow_float_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C float row to python row
"""
if num_of_rows > 0:
return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_float))[:abs(num_of_rows)] ]
return [None if math.isnan(ele) else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_float))[:abs(num_of_rows)]]
else:
return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_float))[:abs(num_of_rows)] ]
return [None if math.isnan(ele) else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_float))[:abs(num_of_rows)]]
def _crow_double_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C double row to python row
"""
if num_of_rows > 0:
return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_double))[:abs(num_of_rows)] ]
return [None if math.isnan(ele) else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_double))[:abs(num_of_rows)]]
else:
return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_double))[:abs(num_of_rows)] ]
return [None if math.isnan(ele) else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_double))[:abs(num_of_rows)]]
def _crow_binary_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C binary row to python row
"""
assert(nbytes is not None)
if num_of_rows > 0:
return [ None if ele.value[0:1] == FieldType.C_BINARY_NULL else ele.value.decode('utf-8') for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_char * nbytes)))[:abs(num_of_rows)]]
return [None if ele.value[0:1] == FieldType.C_BINARY_NULL else ele.value.decode(
'utf-8') for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_char * nbytes)))[:abs(num_of_rows)]]
else:
return [ None if ele.value[0:1] == FieldType.C_BINARY_NULL else ele.value.decode('utf-8') for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_char * nbytes)))[:abs(num_of_rows)]]
return [None if ele.value[0:1] == FieldType.C_BINARY_NULL else ele.value.decode(
'utf-8') for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_char * nbytes)))[:abs(num_of_rows)]]
def _crow_nchar_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C nchar row to python row
......@@ -98,90 +140,107 @@ def _crow_nchar_to_python(data, num_of_rows, nbytes=None, micro=False):
try:
if num_of_rows >= 0:
tmpstr = ctypes.c_char_p(data)
res.append( tmpstr.value.decode() )
res.append(tmpstr.value.decode())
else:
res.append( (ctypes.cast(data+nbytes*i, ctypes.POINTER(ctypes.c_wchar * (nbytes//4))))[0].value )
res.append((ctypes.cast(data + nbytes * i,
ctypes.POINTER(ctypes.c_wchar * (nbytes // 4))))[0].value)
except ValueError:
res.append(None)
return res
def _crow_binary_to_python_block(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C binary row to python row
"""
assert(nbytes is not None)
res=[]
res = []
if num_of_rows > 0:
for i in range(abs(num_of_rows)):
try:
rbyte=ctypes.cast(data+nbytes*i,ctypes.POINTER(ctypes.c_short))[:1].pop()
tmpstr = ctypes.c_char_p(data+nbytes*i+2)
res.append( tmpstr.value.decode()[0:rbyte] )
rbyte = ctypes.cast(
data + nbytes * i,
ctypes.POINTER(
ctypes.c_short))[
:1].pop()
tmpstr = ctypes.c_char_p(data + nbytes * i + 2)
res.append(tmpstr.value.decode()[0:rbyte])
except ValueError:
res.append(None)
else:
for i in range(abs(num_of_rows)):
try:
rbyte=ctypes.cast(data+nbytes*i,ctypes.POINTER(ctypes.c_short))[:1].pop()
tmpstr = ctypes.c_char_p(data+nbytes*i+2)
res.append( tmpstr.value.decode()[0:rbyte] )
rbyte = ctypes.cast(
data + nbytes * i,
ctypes.POINTER(
ctypes.c_short))[
:1].pop()
tmpstr = ctypes.c_char_p(data + nbytes * i + 2)
res.append(tmpstr.value.decode()[0:rbyte])
except ValueError:
res.append(None)
return res
def _crow_nchar_to_python_block(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C nchar row to python row
"""
assert(nbytes is not None)
res=[]
res = []
if num_of_rows >= 0:
for i in range(abs(num_of_rows)):
try:
tmpstr = ctypes.c_char_p(data+nbytes*i+2)
res.append( tmpstr.value.decode() )
tmpstr = ctypes.c_char_p(data + nbytes * i + 2)
res.append(tmpstr.value.decode())
except ValueError:
res.append(None)
else:
for i in range(abs(num_of_rows)):
try:
res.append( (ctypes.cast(data+nbytes*i+2, ctypes.POINTER(ctypes.c_wchar * (nbytes//4))))[0].value )
res.append((ctypes.cast(data + nbytes * i + 2,
ctypes.POINTER(ctypes.c_wchar * (nbytes // 4))))[0].value)
except ValueError:
res.append(None)
return res
_CONVERT_FUNC = {
FieldType.C_BOOL: _crow_bool_to_python,
FieldType.C_TINYINT : _crow_tinyint_to_python,
FieldType.C_SMALLINT : _crow_smallint_to_python,
FieldType.C_INT : _crow_int_to_python,
FieldType.C_BIGINT : _crow_bigint_to_python,
FieldType.C_FLOAT : _crow_float_to_python,
FieldType.C_DOUBLE : _crow_double_to_python,
FieldType.C_TINYINT: _crow_tinyint_to_python,
FieldType.C_SMALLINT: _crow_smallint_to_python,
FieldType.C_INT: _crow_int_to_python,
FieldType.C_BIGINT: _crow_bigint_to_python,
FieldType.C_FLOAT: _crow_float_to_python,
FieldType.C_DOUBLE: _crow_double_to_python,
FieldType.C_BINARY: _crow_binary_to_python,
FieldType.C_TIMESTAMP : _crow_timestamp_to_python,
FieldType.C_NCHAR : _crow_nchar_to_python
FieldType.C_TIMESTAMP: _crow_timestamp_to_python,
FieldType.C_NCHAR: _crow_nchar_to_python
}
_CONVERT_FUNC_BLOCK = {
FieldType.C_BOOL: _crow_bool_to_python,
FieldType.C_TINYINT : _crow_tinyint_to_python,
FieldType.C_SMALLINT : _crow_smallint_to_python,
FieldType.C_INT : _crow_int_to_python,
FieldType.C_BIGINT : _crow_bigint_to_python,
FieldType.C_FLOAT : _crow_float_to_python,
FieldType.C_DOUBLE : _crow_double_to_python,
FieldType.C_TINYINT: _crow_tinyint_to_python,
FieldType.C_SMALLINT: _crow_smallint_to_python,
FieldType.C_INT: _crow_int_to_python,
FieldType.C_BIGINT: _crow_bigint_to_python,
FieldType.C_FLOAT: _crow_float_to_python,
FieldType.C_DOUBLE: _crow_double_to_python,
FieldType.C_BINARY: _crow_binary_to_python_block,
FieldType.C_TIMESTAMP : _crow_timestamp_to_python,
FieldType.C_NCHAR : _crow_nchar_to_python_block
FieldType.C_TIMESTAMP: _crow_timestamp_to_python,
FieldType.C_NCHAR: _crow_nchar_to_python_block
}
# Corresponding TAOS_FIELD structure in C
class TaosField(ctypes.Structure):
_fields_ = [('name', ctypes.c_char * 65),
('type', ctypes.c_char),
('bytes', ctypes.c_short)]
# C interface class
class CTaosInterface(object):
libtaos = ctypes.windll.LoadLibrary('taos')
......@@ -218,7 +277,7 @@ class CTaosInterface(object):
except AttributeError:
raise AttributeError("config is expected as a str")
if config != None:
if config is not None:
CTaosInterface.libtaos.taos_options(3, self._config)
CTaosInterface.libtaos.taos_init()
......@@ -229,7 +288,13 @@ class CTaosInterface(object):
"""
return self._config
def connect(self, host=None, user="root", password="taosdata", db=None, port=0):
def connect(
self,
host=None,
user="root",
password="taosdata",
db=None,
port=0):
'''
Function to connect to server
......@@ -238,7 +303,7 @@ class CTaosInterface(object):
# host
try:
_host = ctypes.c_char_p(host.encode(
"utf-8")) if host != None else ctypes.c_char_p(None)
"utf-8")) if host is not None else ctypes.c_char_p(None)
except AttributeError:
raise AttributeError("host is expected as a str")
......@@ -257,7 +322,7 @@ class CTaosInterface(object):
# db
try:
_db = ctypes.c_char_p(
db.encode("utf-8")) if db != None else ctypes.c_char_p(None)
db.encode("utf-8")) if db is not None else ctypes.c_char_p(None)
except AttributeError:
raise AttributeError("db is expected as a str")
......@@ -270,11 +335,11 @@ class CTaosInterface(object):
connection = ctypes.c_void_p(CTaosInterface.libtaos.taos_connect(
_host, _user, _password, _db, _port))
if connection.value == None:
if connection.value is None:
print('connect to TDengine failed')
raise ConnectionError("connect to TDengine failed")
# sys.exit(1)
#else:
# else:
# print('connect to TDengine success')
return connection
......@@ -295,7 +360,8 @@ class CTaosInterface(object):
@rtype: 0 on success and -1 on failure
'''
try:
return CTaosInterface.libtaos.taos_query(connection, ctypes.c_char_p(sql.encode('utf-8')))
return CTaosInterface.libtaos.taos_query(
connection, ctypes.c_char_p(sql.encode('utf-8')))
except AttributeError:
raise AttributeError("sql is expected as a string")
# finally:
......@@ -362,35 +428,49 @@ class CTaosInterface(object):
result, ctypes.byref(pblock))
if num_of_rows == 0:
return None, 0
isMicro = (CTaosInterface.libtaos.taos_result_precision(result) == FieldType.C_TIMESTAMP_MICRO)
isMicro = (CTaosInterface.libtaos.taos_result_precision(
result) == FieldType.C_TIMESTAMP_MICRO)
blocks = [None] * len(fields)
fieldL = CTaosInterface.libtaos.taos_fetch_lengths(result)
fieldLen = [ele for ele in ctypes.cast(fieldL, ctypes.POINTER(ctypes.c_int))[:len(fields)]]
fieldLen = [
ele for ele in ctypes.cast(
fieldL, ctypes.POINTER(
ctypes.c_int))[
:len(fields)]]
for i in range(len(fields)):
data = ctypes.cast(pblock, ctypes.POINTER(ctypes.c_void_p))[i]
if fields[i]['type'] not in _CONVERT_FUNC_BLOCK:
raise DatabaseError("Invalid data type returned from database")
blocks[i] = _CONVERT_FUNC_BLOCK[fields[i]['type']](data, num_of_rows, fieldLen[i], isMicro)
blocks[i] = _CONVERT_FUNC_BLOCK[fields[i]['type']](
data, num_of_rows, fieldLen[i], isMicro)
return blocks, abs(num_of_rows)
@staticmethod
def fetchRow(result, fields):
pblock = ctypes.c_void_p(0)
pblock = CTaosInterface.libtaos.taos_fetch_row(result)
if pblock :
if pblock:
num_of_rows = 1
isMicro = (CTaosInterface.libtaos.taos_result_precision(result) == FieldType.C_TIMESTAMP_MICRO)
isMicro = (CTaosInterface.libtaos.taos_result_precision(
result) == FieldType.C_TIMESTAMP_MICRO)
blocks = [None] * len(fields)
fieldL = CTaosInterface.libtaos.taos_fetch_lengths(result)
fieldLen = [ele for ele in ctypes.cast(fieldL, ctypes.POINTER(ctypes.c_int))[:len(fields)]]
fieldLen = [
ele for ele in ctypes.cast(
fieldL, ctypes.POINTER(
ctypes.c_int))[
:len(fields)]]
for i in range(len(fields)):
data = ctypes.cast(pblock, ctypes.POINTER(ctypes.c_void_p))[i]
if fields[i]['type'] not in _CONVERT_FUNC:
raise DatabaseError("Invalid data type returned from database")
raise DatabaseError(
"Invalid data type returned from database")
if data is None:
blocks[i] = [None]
else:
blocks[i] = _CONVERT_FUNC[fields[i]['type']](data, num_of_rows, fieldLen[i], isMicro)
blocks[i] = _CONVERT_FUNC[fields[i]['type']](
data, num_of_rows, fieldLen[i], isMicro)
else:
return None, 0
return blocks, abs(num_of_rows)
......
......@@ -2,9 +2,11 @@ from .cursor import TDengineCursor
from .subscription import TDengineSubscription
from .cinterface import CTaosInterface
class TDengineConnection(object):
""" TDengine connection object
"""
def __init__(self, *args, **kwargs):
self._conn = None
self._host = None
......@@ -44,7 +46,12 @@ class TDengineConnection(object):
self._config = kwargs['config']
self._chandle = CTaosInterface(self._config)
self._conn = self._chandle.connect(self._host, self._user, self._password, self._database, self._port)
self._conn = self._chandle.connect(
self._host,
self._user,
self._password,
self._database,
self._port)
def close(self):
"""Close current connection.
......@@ -56,7 +63,8 @@ class TDengineConnection(object):
"""
if self._conn is None:
return None
sub = CTaosInterface.subscribe(self._conn, restart, topic, sql, interval)
sub = CTaosInterface.subscribe(
self._conn, restart, topic, sql, interval)
return TDengineSubscription(sub)
def cursor(self):
......@@ -81,6 +89,7 @@ class TDengineConnection(object):
"""
pass
if __name__ == "__main__":
conn = TDengineConnection(host='192.168.1.107')
conn.close()
......
......@@ -3,6 +3,7 @@
from .dbapi import *
class FieldType(object):
"""TDengine Field Types
"""
......@@ -18,13 +19,21 @@ class FieldType(object):
C_BINARY = 8
C_TIMESTAMP = 9
C_NCHAR = 10
C_TINYINT_UNSIGNED = 11
C_SMALLINT_UNSIGNED = 12
C_INT_UNSIGNED = 13
C_BIGINT_UNSIGNED = 14
# NULL value definition
# NOTE: These values should change according to C definition in tsdb.h
C_BOOL_NULL = 0x02
C_TINYINT_NULL = -128
C_TINYINT_UNSIGNED_NULL = 255
C_SMALLINT_NULL = -32768
C_SMALLINT_UNSIGNED_NULL = 65535
C_INT_NULL = -2147483648
C_INT_UNSIGNED_NULL = 4294967295
C_BIGINT_NULL = -9223372036854775808
C_BIGINT_UNSIGNED_NULL = 18446744073709551615
C_FLOAT_NULL = float('nan')
C_DOUBLE_NULL = float('nan')
C_BINARY_NULL = bytearray([int('0xff', 16)])
......
......@@ -51,7 +51,8 @@ class TDengineCursor(object):
raise OperationalError("Invalid use of fetch iterator")
if self._block_rows <= self._block_iter:
block, self._block_rows = CTaosInterface.fetchRow(self._result, self._fields)
block, self._block_rows = CTaosInterface.fetchRow(
self._result, self._fields)
if self._block_rows == 0:
raise StopIteration
self._block = list(map(tuple, zip(*block)))
......@@ -118,13 +119,14 @@ class TDengineCursor(object):
errno = CTaosInterface.libtaos.taos_errno(self._result)
if errno == 0:
if CTaosInterface.fieldsCount(self._result) == 0:
self._affected_rows += CTaosInterface.affectedRows(self._result )
return CTaosInterface.affectedRows(self._result )
self._affected_rows += CTaosInterface.affectedRows(
self._result)
return CTaosInterface.affectedRows(self._result)
else:
self._fields = CTaosInterface.useResult(self._result )
self._fields = CTaosInterface.useResult(self._result)
return self._handle_result()
else:
raise ProgrammingError(CTaosInterface.errStr(self._result ), errno)
raise ProgrammingError(CTaosInterface.errStr(self._result), errno)
def executemany(self, operation, seq_of_parameters):
"""Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters.
......@@ -148,10 +150,13 @@ class TDengineCursor(object):
buffer = [[] for i in range(len(self._fields))]
self._rowcount = 0
while True:
block, num_of_fields = CTaosInterface.fetchRow(self._result, self._fields)
block, num_of_fields = CTaosInterface.fetchRow(
self._result, self._fields)
errno = CTaosInterface.libtaos.taos_errno(self._result)
if errno != 0:
raise ProgrammingError(CTaosInterface.errStr(self._result), errno)
raise ProgrammingError(
CTaosInterface.errStr(
self._result), errno)
if num_of_fields == 0:
break
self._rowcount += num_of_fields
......@@ -166,20 +171,21 @@ class TDengineCursor(object):
buffer = [[] for i in range(len(self._fields))]
self._rowcount = 0
while True:
block, num_of_fields = CTaosInterface.fetchBlock(self._result, self._fields)
block, num_of_fields = CTaosInterface.fetchBlock(
self._result, self._fields)
errno = CTaosInterface.libtaos.taos_errno(self._result)
if errno != 0:
raise ProgrammingError(CTaosInterface.errStr(self._result), errno)
if num_of_fields == 0: break
raise ProgrammingError(
CTaosInterface.errStr(
self._result), errno)
if num_of_fields == 0:
break
self._rowcount += num_of_fields
for i in range(len(self._fields)):
buffer[i].extend(block[i])
return list(map(tuple, zip(*buffer)))
def nextset(self):
"""
"""
......@@ -210,6 +216,7 @@ class TDengineCursor(object):
"""
self._description = []
for ele in self._fields:
self._description.append((ele['name'], ele['type'], None, None, None, None, False))
self._description.append(
(ele['name'], ele['type'], None, None, None, None, False))
return self._result
......@@ -4,6 +4,7 @@
import time
import datetime
class DBAPITypeObject(object):
def __init__(self, *values):
self.values = values
......@@ -16,19 +17,24 @@ class DBAPITypeObject(object):
else:
return -1
Date = datetime.date
Time = datetime.time
Timestamp = datetime.datetime
def DataFromTicks(ticks):
return Date(*time.localtime(ticks)[:3])
def TimeFromTicks(ticks):
return Time(*time.localtime(ticks)[3:6])
def TimestampFromTicks(ticks):
return Timestamp(*time.localtime(ticks)[:6])
Binary = bytes
# STRING = DBAPITypeObject(*constants.FieldType.get_string_types())
......
"""Python exceptions
"""
class Error(Exception):
def __init__(self, msg=None, errno=None):
self.msg = msg
......@@ -10,26 +11,31 @@ class Error(Exception):
def __str__(self):
return self._full_msg
class Warning(Exception):
"""Exception raised for important warnings like data truncations while inserting.
"""
pass
class InterfaceError(Error):
"""Exception raised for errors that are related to the database interface rather than the database itself.
"""
pass
class DatabaseError(Error):
"""Exception raised for errors that are related to the database.
"""
pass
class DataError(DatabaseError):
"""Exception raised for errors that are due to problems with the processed data like division by zero, numeric value out of range.
"""
pass
class OperationalError(DatabaseError):
"""Exception raised for errors that are related to the database's operation and not necessarily under the control of the programmer
"""
......@@ -41,16 +47,19 @@ class IntegrityError(DatabaseError):
"""
pass
class InternalError(DatabaseError):
"""Exception raised when the database encounters an internal error.
"""
pass
class ProgrammingError(DatabaseError):
"""Exception raised for programming errors.
"""
pass
class NotSupportedError(DatabaseError):
"""Exception raised in case a method or database API was used which is not supported by the database,.
"""
......
from .cinterface import CTaosInterface
from .error import *
class TDengineSubscription(object):
"""TDengine subscription object
"""
def __init__(self, sub):
self._sub = sub
def consume(self):
"""Consume rows of a subscription
"""
......@@ -18,15 +19,15 @@ class TDengineSubscription(object):
buffer = [[] for i in range(len(fields))]
while True:
block, num_of_fields = CTaosInterface.fetchBlock(result, fields)
if num_of_fields == 0: break
if num_of_fields == 0:
break
for i in range(len(fields)):
buffer[i].extend(block[i])
self.fields = fields
return list(map(tuple, zip(*buffer)))
def close(self, keepProgress = True):
def close(self, keepProgress=True):
"""Close the Subscription.
"""
if self._sub is None:
......@@ -38,12 +39,16 @@ class TDengineSubscription(object):
if __name__ == '__main__':
from .connection import TDengineConnection
conn = TDengineConnection(host="127.0.0.1", user="root", password="taosdata", database="test")
conn = TDengineConnection(
host="127.0.0.1",
user="root",
password="taosdata",
database="test")
# Generate a cursor object to run SQL commands
sub = conn.subscribe(True, "test", "select * from meters;", 1000)
for i in range(0,10):
for i in range(0, 10):
data = sub.consume()
for d in data:
print(d)
......
......@@ -26,7 +26,7 @@ class TDTestCase:
tdSql.prepare()
ret = tdSql.execute(
'create table tb (ts timestamp, speed int unsigned)')
'create table tb (ts timestamp, speed tinyint unsigned)')
insertRows = 10
tdLog.info("insert %d rows" % (insertRows))
......@@ -38,11 +38,11 @@ class TDTestCase:
tdLog.info("insert earlier data")
tdSql.execute('insert into tb values (now - 5m , 10)')
tdSql.execute('insert into tb values (now - 6m , 10)')
tdSql.execute('insert into tb values (now - 7m , 10)')
tdSql.execute('insert into tb values (now - 8m , 4294967294)')
tdSql.execute('insert into tb values (now - 7m , NULL)')
tdSql.execute('insert into tb values (now - 8m , 254)')
tdSql.error('insert into tb values (now - 9m, -1)')
tdSql.error('insert into tb values (now - 9m, 4294967295)')
tdSql.error('insert into tb values (now - 9m, 255)')
tdSql.query("select * from tb")
tdSql.checkRows(insertRows + 4)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册