From a7eaa8ad7e617e6fa2999b5e80a0a178c0cd077a Mon Sep 17 00:00:00 2001 From: localvar Date: Tue, 14 Jan 2020 10:50:30 +0800 Subject: [PATCH] tbase-1470: python2&3, windows --- .../python/windows/python2/taos/cinterface.py | 65 +- .../python/windows/python2/taos/connection.py | 13 +- .../windows/python2/taos/subscription.py | 52 ++ .../python/windows/python3/taos/cinterface.py | 775 +++++++++--------- .../python/windows/python3/taos/connection.py | 168 ++-- .../windows/python3/taos/subscription.py | 52 ++ 6 files changed, 668 insertions(+), 457 deletions(-) create mode 100644 src/connector/python/windows/python2/taos/subscription.py create mode 100644 src/connector/python/windows/python3/taos/subscription.py diff --git a/src/connector/python/windows/python2/taos/cinterface.py b/src/connector/python/windows/python2/taos/cinterface.py index 8e3b701929..f8cdfcc51e 100644 --- a/src/connector/python/windows/python2/taos/cinterface.py +++ b/src/connector/python/windows/python2/taos/cinterface.py @@ -13,14 +13,14 @@ def _convert_microsecond_to_datetime(micro): def _crow_timestamp_to_python(data, num_of_rows, nbytes=None, micro=False): """Function to convert C bool row to python row """ - _timstamp_converter = _convert_millisecond_to_datetime + _timestamp_converter = _convert_millisecond_to_datetime if micro: - _timstamp_converter = _convert_microsecond_to_datetime + _timestamp_converter = _convert_microsecond_to_datetime if num_of_rows > 0: - return list(map(_timstamp_converter, ctypes.cast(data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)][::-1])) + return list(map(_timestamp_converter, ctypes.cast(data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)][::-1])) else: - return list(map(_timstamp_converter, ctypes.cast(data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)])) + return list(map(_timestamp_converter, ctypes.cast(data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)])) def _crow_bool_to_python(data, num_of_rows, nbytes=None, micro=False): """Function to convert C bool row to python row @@ -144,6 +144,8 @@ class CTaosInterface(object): libtaos.taos_use_result.restype = ctypes.c_void_p libtaos.taos_fetch_row.restype = ctypes.POINTER(ctypes.c_void_p) libtaos.taos_errstr.restype = ctypes.c_char_p + libtaos.taos_subscribe.restype = ctypes.c_void_p + libtaos.taos_consume.restype = ctypes.c_void_p def __init__(self, config=None): ''' @@ -252,6 +254,41 @@ class CTaosInterface(object): """ return CTaosInterface.libtaos.taos_affected_rows(connection) + @staticmethod + def subscribe(connection, restart, topic, sql, interval): + """Create a subscription + @restart boolean, + @sql string, sql statement for data query, must be a 'select' statement. + @topic string, name of this subscription + """ + return ctypes.c_void_p(CTaosInterface.libtaos.taos_subscribe( + connection, + 1 if restart else 0, + ctypes.c_char_p(topic.encode('utf-8')), + ctypes.c_char_p(sql.encode('utf-8')), + None, + None, + interval)) + + @staticmethod + def consume(sub): + """Consume data of a subscription + """ + result = ctypes.c_void_p(CTaosInterface.libtaos.taos_consume(sub)) + fields = [] + pfields = CTaosInterface.fetchFields(result) + for i in range(CTaosInterface.libtaos.taos_num_fields(result)): + fields.append({'name': pfields[i].name.decode('utf-8'), + 'bytes': pfields[i].bytes, + 'type': ord(pfields[i].type)}) + return result, fields + + @staticmethod + def unsubscribe(sub, keepProgress): + """Cancel a subscription + """ + CTaosInterface.libtaos.taos_unsubscribe(sub, 1 if keepProgress else 0) + @staticmethod def useResult(connection): '''Use result after calling self.query @@ -275,8 +312,8 @@ class CTaosInterface(object): if num_of_rows == 0: return None, 0 - blocks = [None] * len(fields) isMicro = (CTaosInterface.libtaos.taos_result_precision(result) == FieldType.C_TIMESTAMP_MICRO) + blocks = [None] * len(fields) for i in range(len(fields)): data = ctypes.cast(pblock, ctypes.POINTER(ctypes.c_void_p))[i] @@ -351,4 +388,20 @@ class CTaosInterface(object): def errStr(connection): """Return the error styring """ - return CTaosInterface.libtaos.taos_errstr(connection) \ No newline at end of file + return CTaosInterface.libtaos.taos_errstr(connection) + + +if __name__ == '__main__': + cinter = CTaosInterface() + conn = cinter.connect() + + print('Query return value: {}'.format(cinter.query(conn, 'show databases'))) + print('Affected rows: {}'.format(cinter.affectedRows(conn))) + + result, des = CTaosInterface.useResult(conn) + + data, num_of_rows = CTaosInterface.fetchBlock(result, des) + + print(data) + + cinter.close(conn) \ No newline at end of file diff --git a/src/connector/python/windows/python2/taos/connection.py b/src/connector/python/windows/python2/taos/connection.py index ba24209552..e2783975d9 100644 --- a/src/connector/python/windows/python2/taos/connection.py +++ b/src/connector/python/windows/python2/taos/connection.py @@ -1,5 +1,5 @@ -# from .cursor import TDengineCursor from .cursor import TDengineCursor +from .subscription import TDengineSubscription from .cinterface import CTaosInterface class TDengineConnection(object): @@ -15,7 +15,8 @@ class TDengineConnection(object): self._config = None self._chandle = None - self.config(**kwargs) + if len(kwargs) > 0: + self.config(**kwargs) def config(self, **kwargs): # host @@ -50,6 +51,14 @@ class TDengineConnection(object): """ return CTaosInterface.close(self._conn) + def subscribe(self, restart, topic, sql, interval): + """Create a subscription. + """ + if self._conn is None: + return None + sub = CTaosInterface.subscribe(self._conn, restart, topic, sql, interval) + return TDengineSubscription(sub) + def cursor(self): """Return a new Cursor object using the connection. """ diff --git a/src/connector/python/windows/python2/taos/subscription.py b/src/connector/python/windows/python2/taos/subscription.py new file mode 100644 index 0000000000..3af989a138 --- /dev/null +++ b/src/connector/python/windows/python2/taos/subscription.py @@ -0,0 +1,52 @@ +from .cinterface import CTaosInterface +from .error import * + +class TDengineSubscription(object): + """TDengine subscription object + """ + def __init__(self, sub): + self._sub = sub + + + def consume(self): + """Consume rows of a subscription + """ + if self._sub is None: + raise OperationalError("Invalid use of consume") + + result, fields = CTaosInterface.consume(self._sub) + buffer = [[] for i in range(len(fields))] + print(buffer) + while True: + block, num_of_fields = CTaosInterface.fetchBlock(result, fields) + if num_of_fields == 0: break + for i in range(len(fields)): + buffer[i].extend(block[i]) + + return list(map(tuple, zip(*buffer))) + + + def close(self, keepProgress = True): + """Close the Subscription. + """ + if self._sub is None: + return False + + CTaosInterface.unsubscribe(self._sub, keepProgress) + return True + + +if __name__ == '__main__': + from .connection import TDengineConnection + conn = TDengineConnection(host="127.0.0.1", user="root", password="taosdata", database="test") + + # Generate a cursor object to run SQL commands + sub = conn.subscribe(True, "test", "select * from meters;", 1000) + + for i in range(0,10): + data = sub.consume() + for d in data: + print(d) + + sub.close() + conn.close() \ No newline at end of file diff --git a/src/connector/python/windows/python3/taos/cinterface.py b/src/connector/python/windows/python3/taos/cinterface.py index 2cddf5fccf..b4b44e199c 100644 --- a/src/connector/python/windows/python3/taos/cinterface.py +++ b/src/connector/python/windows/python3/taos/cinterface.py @@ -1,370 +1,407 @@ -import ctypes -from .constants import FieldType -from .error import * -import math -import datetime - -def _convert_millisecond_to_datetime(milli): - return datetime.datetime.fromtimestamp(milli/1000.0) - -def _convert_microsecond_to_datetime(micro): - return datetime.datetime.fromtimestamp(micro/1000000.0) - -def _crow_timestamp_to_python(data, num_of_rows, nbytes=None, micro=False): - """Function to convert C bool row to python row - """ - _timestamp_converter = _convert_millisecond_to_datetime - if micro: - _timestamp_converter = _convert_microsecond_to_datetime - - if num_of_rows > 0: - return list(map(_timestamp_converter, ctypes.cast(data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)][::-1])) - else: - return list(map(_timestamp_converter, ctypes.cast(data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)])) - -def _crow_bool_to_python(data, num_of_rows, nbytes=None, micro=False): - """Function to convert C bool row to python row - """ - if num_of_rows > 0: - return [ None if ele == FieldType.C_BOOL_NULL else bool(ele) for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)][::-1] ] - else: - return [ None if ele == FieldType.C_BOOL_NULL else bool(ele) for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_bool))[:abs(num_of_rows)] ] - -def _crow_tinyint_to_python(data, num_of_rows, nbytes=None, micro=False): - """Function to convert C tinyint row to python row - """ - if num_of_rows > 0: - return [ None if ele == FieldType.C_TINYINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)][::-1] ] - else: - return [ None if ele == FieldType.C_TINYINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)] ] - -def _crow_smallint_to_python(data, num_of_rows, nbytes=None, micro=False): - """Function to convert C smallint row to python row - """ - if num_of_rows > 0: - return [ None if ele == FieldType.C_SMALLINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_short))[:abs(num_of_rows)][::-1]] - else: - return [ None if ele == FieldType.C_SMALLINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_short))[:abs(num_of_rows)] ] - -def _crow_int_to_python(data, num_of_rows, nbytes=None, micro=False): - """Function to convert C int row to python row - """ - if num_of_rows > 0: - return [ None if ele == FieldType.C_INT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_int))[:abs(num_of_rows)][::-1] ] - else: - return [ None if ele == FieldType.C_INT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_int))[:abs(num_of_rows)] ] - -def _crow_bigint_to_python(data, num_of_rows, nbytes=None, micro=False): - """Function to convert C bigint row to python row - """ - if num_of_rows > 0: - return [ None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)][::-1] ] - else: - return [ None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)] ] - -def _crow_float_to_python(data, num_of_rows, nbytes=None, micro=False): - """Function to convert C float row to python row - """ - if num_of_rows > 0: - return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_float))[:abs(num_of_rows)][::-1] ] - else: - return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_float))[:abs(num_of_rows)] ] - -def _crow_double_to_python(data, num_of_rows, nbytes=None, micro=False): - """Function to convert C double row to python row - """ - if num_of_rows > 0: - return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_double))[:abs(num_of_rows)][::-1] ] - else: - return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_double))[:abs(num_of_rows)] ] - -def _crow_binary_to_python(data, num_of_rows, nbytes=None, micro=False): - """Function to convert C binary row to python row - """ - if num_of_rows > 0: - return [ None if ele.value[0:1] == FieldType.C_BINARY_NULL else ele.value.decode('utf-8') for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_char * nbytes)))[:abs(num_of_rows)][::-1]] - else: - return [ None if ele.value[0:1] == FieldType.C_BINARY_NULL else ele.value.decode('utf-8') for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_char * nbytes)))[:abs(num_of_rows)]] - -def _crow_nchar_to_python(data, num_of_rows, nbytes=None, micro=False): - """Function to convert C nchar row to python row - """ - assert(nbytes is not None) - - res = [] - - for i in range(abs(num_of_rows)): - try: - if num_of_rows >= 0: - res.append( (ctypes.cast(data+nbytes*(abs(num_of_rows - i -1)), ctypes.POINTER(ctypes.c_wchar * (nbytes//4))))[0].value ) - else: - res.append( (ctypes.cast(data+nbytes*i, ctypes.POINTER(ctypes.c_wchar * (nbytes//4))))[0].value ) - except ValueError: - res.append(None) - - return res - # if num_of_rows > 0: - # for i in range(abs(num_of_rows)): - # try: - # res.append( (ctypes.cast(data+nbytes*i, ctypes.POINTER(ctypes.c_wchar * (nbytes//4))))[0].value ) - # except ValueError: - # res.append(None) - # return res - # # return [ele.value for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_wchar * (nbytes//4))))[:abs(num_of_rows)][::-1]] - # else: - # return [ele.value for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_wchar * (nbytes//4))))[:abs(num_of_rows)]] - -_CONVERT_FUNC = { - FieldType.C_BOOL: _crow_bool_to_python, - FieldType.C_TINYINT : _crow_tinyint_to_python, - FieldType.C_SMALLINT : _crow_smallint_to_python, - FieldType.C_INT : _crow_int_to_python, - FieldType.C_BIGINT : _crow_bigint_to_python, - FieldType.C_FLOAT : _crow_float_to_python, - FieldType.C_DOUBLE : _crow_double_to_python, - FieldType.C_BINARY: _crow_binary_to_python, - FieldType.C_TIMESTAMP : _crow_timestamp_to_python, - FieldType.C_NCHAR : _crow_nchar_to_python -} - -# Corresponding TAOS_FIELD structure in C -class TaosField(ctypes.Structure): - _fields_ = [('name', ctypes.c_char * 64), - ('bytes', ctypes.c_short), - ('type', ctypes.c_char)] - -# C interface class -class CTaosInterface(object): - - libtaos = ctypes.windll.LoadLibrary('taos') - - libtaos.taos_fetch_fields.restype = ctypes.POINTER(TaosField) - libtaos.taos_init.restype = None - libtaos.taos_connect.restype = ctypes.c_void_p - libtaos.taos_use_result.restype = ctypes.c_void_p - libtaos.taos_fetch_row.restype = ctypes.POINTER(ctypes.c_void_p) - libtaos.taos_errstr.restype = ctypes.c_char_p - - def __init__(self, config=None): - ''' - Function to initialize the class - @host : str, hostname to connect - @user : str, username to connect to server - @password : str, password to connect to server - @db : str, default db to use when log in - @config : str, config directory - - @rtype : None - ''' - if config is None: - self._config = ctypes.c_char_p(None) - else: - try: - self._config = ctypes.c_char_p(config.encode('utf-8')) - except AttributeError: - raise AttributeError("config is expected as a str") - - if config != None: - CTaosInterface.libtaos.taos_options(3, self._config) - - CTaosInterface.libtaos.taos_init() - - @property - def config(self): - """ Get current config - """ - return self._config - - def connect(self, host=None, user="root", password="taosdata", db=None, port=0): - ''' - Function to connect to server - - @rtype: c_void_p, TDengine handle - ''' - # host - try: - _host = ctypes.c_char_p(host.encode( - "utf-8")) if host != None else ctypes.c_char_p(None) - except AttributeError: - raise AttributeError("host is expected as a str") - - # user - try: - _user = ctypes.c_char_p(user.encode("utf-8")) - except AttributeError: - raise AttributeError("user is expected as a str") - - # password - try: - _password = ctypes.c_char_p(password.encode("utf-8")) - except AttributeError: - raise AttributeError("password is expected as a str") - - # db - try: - _db = ctypes.c_char_p( - db.encode("utf-8")) if db != None else ctypes.c_char_p(None) - except AttributeError: - raise AttributeError("db is expected as a str") - - # port - try: - _port = ctypes.c_int(port) - except TypeError: - raise TypeError("port is expected as an int") - - connection = ctypes.c_void_p(CTaosInterface.libtaos.taos_connect( - _host, _user, _password, _db, _port)) - - if connection.value == None: - print('connect to TDengine failed') - # sys.exit(1) - else: - print('connect to TDengine success') - - return connection - - @staticmethod - def close(connection): - '''Close the TDengine handle - ''' - CTaosInterface.libtaos.taos_close(connection) - print('connection is closed') - - @staticmethod - def query(connection, sql): - '''Run SQL - - @sql: str, sql string to run - - @rtype: 0 on success and -1 on failure - ''' - try: - return CTaosInterface.libtaos.taos_query(connection, ctypes.c_char_p(sql.encode('utf-8'))) - except AttributeError: - raise AttributeError("sql is expected as a string") - # finally: - # CTaosInterface.libtaos.close(connection) - - @staticmethod - def affectedRows(connection): - """The affected rows after runing query - """ - return CTaosInterface.libtaos.taos_affected_rows(connection) - - @staticmethod - def useResult(connection): - '''Use result after calling self.query - ''' - result = ctypes.c_void_p(CTaosInterface.libtaos.taos_use_result(connection)) - fields = [] - pfields = CTaosInterface.fetchFields(result) - for i in range(CTaosInterface.fieldsCount(connection)): - fields.append({'name': pfields[i].name.decode('utf-8'), - 'bytes': pfields[i].bytes, - 'type': ord(pfields[i].type)}) - - return result, fields - - @staticmethod - def fetchBlock(result, fields): - pblock = ctypes.c_void_p(0) - num_of_rows = CTaosInterface.libtaos.taos_fetch_block( - result, ctypes.byref(pblock)) - - if num_of_rows == 0: - return None, 0 - - isMicro = (CTaosInterface.libtaos.taos_result_precision(result) == FieldType.C_TIMESTAMP_MICRO) - blocks = [None] * len(fields) - for i in range(len(fields)): - data = ctypes.cast(pblock, ctypes.POINTER(ctypes.c_void_p))[i] - - if fields[i]['type'] not in _CONVERT_FUNC: - raise DatabaseError("Invalid data type returned from database") - - blocks[i] = _CONVERT_FUNC[fields[i]['type']](data, num_of_rows, fields[i]['bytes'], isMicro) - - return blocks, abs(num_of_rows) - - @staticmethod - def freeResult(result): - CTaosInterface.libtaos.taos_free_result(result) - result.value = None - - @staticmethod - def fieldsCount(connection): - return CTaosInterface.libtaos.taos_field_count(connection) - - @staticmethod - def fetchFields(result): - return CTaosInterface.libtaos.taos_fetch_fields(result) - - # @staticmethod - # def fetchRow(result, fields): - # l = [] - # row = CTaosInterface.libtaos.taos_fetch_row(result) - # if not row: - # return None - - # for i in range(len(fields)): - # l.append(CTaosInterface.getDataValue( - # row[i], fields[i]['type'], fields[i]['bytes'])) - - # return tuple(l) - - # @staticmethod - # def getDataValue(data, dtype, byte): - # ''' - # ''' - # if not data: - # return None - - # if (dtype == CTaosInterface.TSDB_DATA_TYPE_BOOL): - # return ctypes.cast(data, ctypes.POINTER(ctypes.c_bool))[0] - # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_TINYINT): - # return ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[0] - # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_SMALLINT): - # return ctypes.cast(data, ctypes.POINTER(ctypes.c_short))[0] - # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_INT): - # return ctypes.cast(data, ctypes.POINTER(ctypes.c_int))[0] - # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_BIGINT): - # return ctypes.cast(data, ctypes.POINTER(ctypes.c_long))[0] - # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_FLOAT): - # return ctypes.cast(data, ctypes.POINTER(ctypes.c_float))[0] - # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_DOUBLE): - # return ctypes.cast(data, ctypes.POINTER(ctypes.c_double))[0] - # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_BINARY): - # return (ctypes.cast(data, ctypes.POINTER(ctypes.c_char))[0:byte]).rstrip('\x00') - # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_TIMESTAMP): - # return ctypes.cast(data, ctypes.POINTER(ctypes.c_long))[0] - # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_NCHAR): - # return (ctypes.cast(data, ctypes.c_char_p).value).rstrip('\x00') - - @staticmethod - def errno(connection): - """Return the error number. - """ - return CTaosInterface.libtaos.taos_errno(connection) - - @staticmethod - def errStr(connection): - """Return the error styring - """ - return CTaosInterface.libtaos.taos_errstr(connection).decode('utf-8') - - -if __name__ == '__main__': - cinter = CTaosInterface() - conn = cinter.connect() - - print('Query return value: {}'.format(cinter.query(conn, 'show databases'))) - print('Affected rows: {}'.format(cinter.affectedRows(conn))) - - result, des = CTaosInterface.useResult(conn) - - data, num_of_rows = CTaosInterface.fetchBlock(result, des) - - print(data) - +import ctypes +from .constants import FieldType +from .error import * +import math +import datetime + +def _convert_millisecond_to_datetime(milli): + return datetime.datetime.fromtimestamp(milli/1000.0) + +def _convert_microsecond_to_datetime(micro): + return datetime.datetime.fromtimestamp(micro/1000000.0) + +def _crow_timestamp_to_python(data, num_of_rows, nbytes=None, micro=False): + """Function to convert C bool row to python row + """ + _timestamp_converter = _convert_millisecond_to_datetime + if micro: + _timestamp_converter = _convert_microsecond_to_datetime + + if num_of_rows > 0: + return list(map(_timestamp_converter, ctypes.cast(data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)][::-1])) + else: + return list(map(_timestamp_converter, ctypes.cast(data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)])) + +def _crow_bool_to_python(data, num_of_rows, nbytes=None, micro=False): + """Function to convert C bool row to python row + """ + if num_of_rows > 0: + return [ None if ele == FieldType.C_BOOL_NULL else bool(ele) for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)][::-1] ] + else: + return [ None if ele == FieldType.C_BOOL_NULL else bool(ele) for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_bool))[:abs(num_of_rows)] ] + +def _crow_tinyint_to_python(data, num_of_rows, nbytes=None, micro=False): + """Function to convert C tinyint row to python row + """ + if num_of_rows > 0: + return [ None if ele == FieldType.C_TINYINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)][::-1] ] + else: + return [ None if ele == FieldType.C_TINYINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)] ] + +def _crow_smallint_to_python(data, num_of_rows, nbytes=None, micro=False): + """Function to convert C smallint row to python row + """ + if num_of_rows > 0: + return [ None if ele == FieldType.C_SMALLINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_short))[:abs(num_of_rows)][::-1]] + else: + return [ None if ele == FieldType.C_SMALLINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_short))[:abs(num_of_rows)] ] + +def _crow_int_to_python(data, num_of_rows, nbytes=None, micro=False): + """Function to convert C int row to python row + """ + if num_of_rows > 0: + return [ None if ele == FieldType.C_INT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_int))[:abs(num_of_rows)][::-1] ] + else: + return [ None if ele == FieldType.C_INT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_int))[:abs(num_of_rows)] ] + +def _crow_bigint_to_python(data, num_of_rows, nbytes=None, micro=False): + """Function to convert C bigint row to python row + """ + if num_of_rows > 0: + return [ None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)][::-1] ] + else: + return [ None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)] ] + +def _crow_float_to_python(data, num_of_rows, nbytes=None, micro=False): + """Function to convert C float row to python row + """ + if num_of_rows > 0: + return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_float))[:abs(num_of_rows)][::-1] ] + else: + return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_float))[:abs(num_of_rows)] ] + +def _crow_double_to_python(data, num_of_rows, nbytes=None, micro=False): + """Function to convert C double row to python row + """ + if num_of_rows > 0: + return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_double))[:abs(num_of_rows)][::-1] ] + else: + return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_double))[:abs(num_of_rows)] ] + +def _crow_binary_to_python(data, num_of_rows, nbytes=None, micro=False): + """Function to convert C binary row to python row + """ + if num_of_rows > 0: + return [ None if ele.value[0:1] == FieldType.C_BINARY_NULL else ele.value.decode('utf-8') for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_char * nbytes)))[:abs(num_of_rows)][::-1]] + else: + return [ None if ele.value[0:1] == FieldType.C_BINARY_NULL else ele.value.decode('utf-8') for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_char * nbytes)))[:abs(num_of_rows)]] + +def _crow_nchar_to_python(data, num_of_rows, nbytes=None, micro=False): + """Function to convert C nchar row to python row + """ + assert(nbytes is not None) + + res = [] + + for i in range(abs(num_of_rows)): + try: + if num_of_rows >= 0: + res.append( (ctypes.cast(data+nbytes*(abs(num_of_rows - i -1)), ctypes.POINTER(ctypes.c_wchar * (nbytes//4))))[0].value ) + else: + res.append( (ctypes.cast(data+nbytes*i, ctypes.POINTER(ctypes.c_wchar * (nbytes//4))))[0].value ) + except ValueError: + res.append(None) + + return res + # if num_of_rows > 0: + # for i in range(abs(num_of_rows)): + # try: + # res.append( (ctypes.cast(data+nbytes*i, ctypes.POINTER(ctypes.c_wchar * (nbytes//4))))[0].value ) + # except ValueError: + # res.append(None) + # return res + # # return [ele.value for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_wchar * (nbytes//4))))[:abs(num_of_rows)][::-1]] + # else: + # return [ele.value for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_wchar * (nbytes//4))))[:abs(num_of_rows)]] + +_CONVERT_FUNC = { + FieldType.C_BOOL: _crow_bool_to_python, + FieldType.C_TINYINT : _crow_tinyint_to_python, + FieldType.C_SMALLINT : _crow_smallint_to_python, + FieldType.C_INT : _crow_int_to_python, + FieldType.C_BIGINT : _crow_bigint_to_python, + FieldType.C_FLOAT : _crow_float_to_python, + FieldType.C_DOUBLE : _crow_double_to_python, + FieldType.C_BINARY: _crow_binary_to_python, + FieldType.C_TIMESTAMP : _crow_timestamp_to_python, + FieldType.C_NCHAR : _crow_nchar_to_python +} + +# Corresponding TAOS_FIELD structure in C +class TaosField(ctypes.Structure): + _fields_ = [('name', ctypes.c_char * 64), + ('bytes', ctypes.c_short), + ('type', ctypes.c_char)] + +# C interface class +class CTaosInterface(object): + + libtaos = ctypes.windll.LoadLibrary('taos') + + libtaos.taos_fetch_fields.restype = ctypes.POINTER(TaosField) + libtaos.taos_init.restype = None + libtaos.taos_connect.restype = ctypes.c_void_p + libtaos.taos_use_result.restype = ctypes.c_void_p + libtaos.taos_fetch_row.restype = ctypes.POINTER(ctypes.c_void_p) + libtaos.taos_errstr.restype = ctypes.c_char_p + libtaos.taos_subscribe.restype = ctypes.c_void_p + libtaos.taos_consume.restype = ctypes.c_void_p + + def __init__(self, config=None): + ''' + Function to initialize the class + @host : str, hostname to connect + @user : str, username to connect to server + @password : str, password to connect to server + @db : str, default db to use when log in + @config : str, config directory + + @rtype : None + ''' + if config is None: + self._config = ctypes.c_char_p(None) + else: + try: + self._config = ctypes.c_char_p(config.encode('utf-8')) + except AttributeError: + raise AttributeError("config is expected as a str") + + if config != None: + CTaosInterface.libtaos.taos_options(3, self._config) + + CTaosInterface.libtaos.taos_init() + + @property + def config(self): + """ Get current config + """ + return self._config + + def connect(self, host=None, user="root", password="taosdata", db=None, port=0): + ''' + Function to connect to server + + @rtype: c_void_p, TDengine handle + ''' + # host + try: + _host = ctypes.c_char_p(host.encode( + "utf-8")) if host != None else ctypes.c_char_p(None) + except AttributeError: + raise AttributeError("host is expected as a str") + + # user + try: + _user = ctypes.c_char_p(user.encode("utf-8")) + except AttributeError: + raise AttributeError("user is expected as a str") + + # password + try: + _password = ctypes.c_char_p(password.encode("utf-8")) + except AttributeError: + raise AttributeError("password is expected as a str") + + # db + try: + _db = ctypes.c_char_p( + db.encode("utf-8")) if db != None else ctypes.c_char_p(None) + except AttributeError: + raise AttributeError("db is expected as a str") + + # port + try: + _port = ctypes.c_int(port) + except TypeError: + raise TypeError("port is expected as an int") + + connection = ctypes.c_void_p(CTaosInterface.libtaos.taos_connect( + _host, _user, _password, _db, _port)) + + if connection.value == None: + print('connect to TDengine failed') + # sys.exit(1) + else: + print('connect to TDengine success') + + return connection + + @staticmethod + def close(connection): + '''Close the TDengine handle + ''' + CTaosInterface.libtaos.taos_close(connection) + print('connection is closed') + + @staticmethod + def query(connection, sql): + '''Run SQL + + @sql: str, sql string to run + + @rtype: 0 on success and -1 on failure + ''' + try: + return CTaosInterface.libtaos.taos_query(connection, ctypes.c_char_p(sql.encode('utf-8'))) + except AttributeError: + raise AttributeError("sql is expected as a string") + # finally: + # CTaosInterface.libtaos.close(connection) + + @staticmethod + def affectedRows(connection): + """The affected rows after runing query + """ + return CTaosInterface.libtaos.taos_affected_rows(connection) + + @staticmethod + def subscribe(connection, restart, topic, sql, interval): + """Create a subscription + @restart boolean, + @sql string, sql statement for data query, must be a 'select' statement. + @topic string, name of this subscription + """ + return ctypes.c_void_p(CTaosInterface.libtaos.taos_subscribe( + connection, + 1 if restart else 0, + ctypes.c_char_p(topic.encode('utf-8')), + ctypes.c_char_p(sql.encode('utf-8')), + None, + None, + interval)) + + @staticmethod + def consume(sub): + """Consume data of a subscription + """ + result = ctypes.c_void_p(CTaosInterface.libtaos.taos_consume(sub)) + fields = [] + pfields = CTaosInterface.fetchFields(result) + for i in range(CTaosInterface.libtaos.taos_num_fields(result)): + fields.append({'name': pfields[i].name.decode('utf-8'), + 'bytes': pfields[i].bytes, + 'type': ord(pfields[i].type)}) + return result, fields + + @staticmethod + def unsubscribe(sub, keepProgress): + """Cancel a subscription + """ + CTaosInterface.libtaos.taos_unsubscribe(sub, 1 if keepProgress else 0) + + @staticmethod + def useResult(connection): + '''Use result after calling self.query + ''' + result = ctypes.c_void_p(CTaosInterface.libtaos.taos_use_result(connection)) + fields = [] + pfields = CTaosInterface.fetchFields(result) + for i in range(CTaosInterface.fieldsCount(connection)): + fields.append({'name': pfields[i].name.decode('utf-8'), + 'bytes': pfields[i].bytes, + 'type': ord(pfields[i].type)}) + + return result, fields + + @staticmethod + def fetchBlock(result, fields): + pblock = ctypes.c_void_p(0) + num_of_rows = CTaosInterface.libtaos.taos_fetch_block( + result, ctypes.byref(pblock)) + + if num_of_rows == 0: + return None, 0 + + isMicro = (CTaosInterface.libtaos.taos_result_precision(result) == FieldType.C_TIMESTAMP_MICRO) + blocks = [None] * len(fields) + for i in range(len(fields)): + data = ctypes.cast(pblock, ctypes.POINTER(ctypes.c_void_p))[i] + + if fields[i]['type'] not in _CONVERT_FUNC: + raise DatabaseError("Invalid data type returned from database") + + blocks[i] = _CONVERT_FUNC[fields[i]['type']](data, num_of_rows, fields[i]['bytes'], isMicro) + + return blocks, abs(num_of_rows) + + @staticmethod + def freeResult(result): + CTaosInterface.libtaos.taos_free_result(result) + result.value = None + + @staticmethod + def fieldsCount(connection): + return CTaosInterface.libtaos.taos_field_count(connection) + + @staticmethod + def fetchFields(result): + return CTaosInterface.libtaos.taos_fetch_fields(result) + + # @staticmethod + # def fetchRow(result, fields): + # l = [] + # row = CTaosInterface.libtaos.taos_fetch_row(result) + # if not row: + # return None + + # for i in range(len(fields)): + # l.append(CTaosInterface.getDataValue( + # row[i], fields[i]['type'], fields[i]['bytes'])) + + # return tuple(l) + + # @staticmethod + # def getDataValue(data, dtype, byte): + # ''' + # ''' + # if not data: + # return None + + # if (dtype == CTaosInterface.TSDB_DATA_TYPE_BOOL): + # return ctypes.cast(data, ctypes.POINTER(ctypes.c_bool))[0] + # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_TINYINT): + # return ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[0] + # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_SMALLINT): + # return ctypes.cast(data, ctypes.POINTER(ctypes.c_short))[0] + # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_INT): + # return ctypes.cast(data, ctypes.POINTER(ctypes.c_int))[0] + # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_BIGINT): + # return ctypes.cast(data, ctypes.POINTER(ctypes.c_long))[0] + # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_FLOAT): + # return ctypes.cast(data, ctypes.POINTER(ctypes.c_float))[0] + # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_DOUBLE): + # return ctypes.cast(data, ctypes.POINTER(ctypes.c_double))[0] + # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_BINARY): + # return (ctypes.cast(data, ctypes.POINTER(ctypes.c_char))[0:byte]).rstrip('\x00') + # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_TIMESTAMP): + # return ctypes.cast(data, ctypes.POINTER(ctypes.c_long))[0] + # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_NCHAR): + # return (ctypes.cast(data, ctypes.c_char_p).value).rstrip('\x00') + + @staticmethod + def errno(connection): + """Return the error number. + """ + return CTaosInterface.libtaos.taos_errno(connection) + + @staticmethod + def errStr(connection): + """Return the error styring + """ + return CTaosInterface.libtaos.taos_errstr(connection).decode('utf-8') + + +if __name__ == '__main__': + cinter = CTaosInterface() + conn = cinter.connect() + + print('Query return value: {}'.format(cinter.query(conn, 'show databases'))) + print('Affected rows: {}'.format(cinter.affectedRows(conn))) + + result, des = CTaosInterface.useResult(conn) + + data, num_of_rows = CTaosInterface.fetchBlock(result, des) + + print(data) + cinter.close(conn) \ No newline at end of file diff --git a/src/connector/python/windows/python3/taos/connection.py b/src/connector/python/windows/python3/taos/connection.py index a88e25a6db..e2783975d9 100644 --- a/src/connector/python/windows/python3/taos/connection.py +++ b/src/connector/python/windows/python3/taos/connection.py @@ -1,81 +1,89 @@ -# from .cursor import TDengineCursor -from .cursor import TDengineCursor -from .cinterface import CTaosInterface - -class TDengineConnection(object): - """ TDengine connection object - """ - def __init__(self, *args, **kwargs): - self._conn = None - self._host = None - self._user = "root" - self._password = "taosdata" - self._database = None - self._port = 0 - self._config = None - self._chandle = None - - if len(kwargs) > 0: - self.config(**kwargs) - - def config(self, **kwargs): - # host - if 'host' in kwargs: - self._host = kwargs['host'] - - # user - if 'user' in kwargs: - self._user = kwargs['user'] - - # password - if 'password' in kwargs: - self._password = kwargs['password'] - - # database - if 'database' in kwargs: - self._database = kwargs['database'] - - # port - if 'port' in kwargs: - self._port = kwargs['port'] - - # config - if 'config' in kwargs: - self._config = kwargs['config'] - - self._chandle = CTaosInterface(self._config) - self._conn = self._chandle.connect(self._host, self._user, self._password, self._database, self._port) - - def close(self): - """Close current connection. - """ - return CTaosInterface.close(self._conn) - - def cursor(self): - """Return a new Cursor object using the connection. - """ - return TDengineCursor(self) - - def commit(self): - """Commit any pending transaction to the database. - - Since TDengine do not support transactions, the implement is void functionality. - """ - pass - - def rollback(self): - """Void functionality - """ - pass - - def clear_result_set(self): - """Clear unused result set on this connection. - """ - result = self._chandle.useResult(self._conn)[0] - if result: - self._chandle.freeResult(result) - -if __name__ == "__main__": - conn = TDengineConnection(host='192.168.1.107') - conn.close() +from .cursor import TDengineCursor +from .subscription import TDengineSubscription +from .cinterface import CTaosInterface + +class TDengineConnection(object): + """ TDengine connection object + """ + def __init__(self, *args, **kwargs): + self._conn = None + self._host = None + self._user = "root" + self._password = "taosdata" + self._database = None + self._port = 0 + self._config = None + self._chandle = None + + if len(kwargs) > 0: + self.config(**kwargs) + + def config(self, **kwargs): + # host + if 'host' in kwargs: + self._host = kwargs['host'] + + # user + if 'user' in kwargs: + self._user = kwargs['user'] + + # password + if 'password' in kwargs: + self._password = kwargs['password'] + + # database + if 'database' in kwargs: + self._database = kwargs['database'] + + # port + if 'port' in kwargs: + self._port = kwargs['port'] + + # config + if 'config' in kwargs: + self._config = kwargs['config'] + + self._chandle = CTaosInterface(self._config) + self._conn = self._chandle.connect(self._host, self._user, self._password, self._database, self._port) + + def close(self): + """Close current connection. + """ + return CTaosInterface.close(self._conn) + + def subscribe(self, restart, topic, sql, interval): + """Create a subscription. + """ + if self._conn is None: + return None + sub = CTaosInterface.subscribe(self._conn, restart, topic, sql, interval) + return TDengineSubscription(sub) + + def cursor(self): + """Return a new Cursor object using the connection. + """ + return TDengineCursor(self) + + def commit(self): + """Commit any pending transaction to the database. + + Since TDengine do not support transactions, the implement is void functionality. + """ + pass + + def rollback(self): + """Void functionality + """ + pass + + def clear_result_set(self): + """Clear unused result set on this connection. + """ + result = self._chandle.useResult(self._conn)[0] + if result: + self._chandle.freeResult(result) + +if __name__ == "__main__": + conn = TDengineConnection(host='192.168.1.107') + conn.close() print("Hello world") \ No newline at end of file diff --git a/src/connector/python/windows/python3/taos/subscription.py b/src/connector/python/windows/python3/taos/subscription.py new file mode 100644 index 0000000000..3af989a138 --- /dev/null +++ b/src/connector/python/windows/python3/taos/subscription.py @@ -0,0 +1,52 @@ +from .cinterface import CTaosInterface +from .error import * + +class TDengineSubscription(object): + """TDengine subscription object + """ + def __init__(self, sub): + self._sub = sub + + + def consume(self): + """Consume rows of a subscription + """ + if self._sub is None: + raise OperationalError("Invalid use of consume") + + result, fields = CTaosInterface.consume(self._sub) + buffer = [[] for i in range(len(fields))] + print(buffer) + while True: + block, num_of_fields = CTaosInterface.fetchBlock(result, fields) + if num_of_fields == 0: break + for i in range(len(fields)): + buffer[i].extend(block[i]) + + return list(map(tuple, zip(*buffer))) + + + def close(self, keepProgress = True): + """Close the Subscription. + """ + if self._sub is None: + return False + + CTaosInterface.unsubscribe(self._sub, keepProgress) + return True + + +if __name__ == '__main__': + from .connection import TDengineConnection + conn = TDengineConnection(host="127.0.0.1", user="root", password="taosdata", database="test") + + # Generate a cursor object to run SQL commands + sub = conn.subscribe(True, "test", "select * from meters;", 1000) + + for i in range(0,10): + data = sub.consume() + for d in data: + print(d) + + sub.close() + conn.close() \ No newline at end of file -- GitLab