提交 6084f5a7 编写于 作者: T Tao Liu

[TD-314] modify python connector to adapt C API

上级 69432508
...@@ -142,12 +142,13 @@ class CTaosInterface(object): ...@@ -142,12 +142,13 @@ class CTaosInterface(object):
libtaos.taos_fetch_fields.restype = ctypes.POINTER(TaosField) libtaos.taos_fetch_fields.restype = ctypes.POINTER(TaosField)
libtaos.taos_init.restype = None libtaos.taos_init.restype = None
libtaos.taos_connect.restype = ctypes.c_void_p libtaos.taos_connect.restype = ctypes.c_void_p
libtaos.taos_use_result.restype = ctypes.c_void_p #libtaos.taos_use_result.restype = ctypes.c_void_p
libtaos.taos_fetch_row.restype = ctypes.POINTER(ctypes.c_void_p) libtaos.taos_fetch_row.restype = ctypes.POINTER(ctypes.c_void_p)
libtaos.taos_errstr.restype = ctypes.c_char_p libtaos.taos_errstr.restype = ctypes.c_char_p
libtaos.taos_subscribe.restype = ctypes.c_void_p libtaos.taos_subscribe.restype = ctypes.c_void_p
libtaos.taos_consume.restype = ctypes.c_void_p libtaos.taos_consume.restype = ctypes.c_void_p
libtaos.taos_fetch_lengths.restype = ctypes.c_void_p libtaos.taos_fetch_lengths.restype = ctypes.c_void_p
libtaos.taos_free_result.restype = None
def __init__(self, config=None): def __init__(self, config=None):
''' '''
...@@ -249,12 +250,12 @@ class CTaosInterface(object): ...@@ -249,12 +250,12 @@ class CTaosInterface(object):
raise AttributeError("sql is expected as a string") raise AttributeError("sql is expected as a string")
# finally: # finally:
# CTaosInterface.libtaos.close(connection) # CTaosInterface.libtaos.close(connection)
@staticmethod @staticmethod
def affectedRows(connection): def affectedRows(result):
"""The affected rows after runing query """The affected rows after runing query
""" """
return CTaosInterface.libtaos.taos_affected_rows(connection) return CTaosInterface.libtaos.taos_affected_rows(result)
@staticmethod @staticmethod
def subscribe(connection, restart, topic, sql, interval): def subscribe(connection, restart, topic, sql, interval):
...@@ -292,18 +293,17 @@ class CTaosInterface(object): ...@@ -292,18 +293,17 @@ class CTaosInterface(object):
CTaosInterface.libtaos.taos_unsubscribe(sub, 1 if keepProgress else 0) CTaosInterface.libtaos.taos_unsubscribe(sub, 1 if keepProgress else 0)
@staticmethod @staticmethod
def useResult(connection): def useResult(result):
'''Use result after calling self.query '''Use result after calling self.query
''' '''
result = ctypes.c_void_p(CTaosInterface.libtaos.taos_use_result(connection))
fields = [] fields = []
pfields = CTaosInterface.fetchFields(result) pfields = CTaosInterface.fetchFields(result)
for i in range(CTaosInterface.fieldsCount(connection)): for i in range(CTaosInterface.fieldsCount(result)):
fields.append({'name': pfields[i].name.decode('utf-8'), fields.append({'name': pfields[i].name.decode('utf-8'),
'bytes': pfields[i].bytes, 'bytes': pfields[i].bytes,
'type': ord(pfields[i].type)}) 'type': ord(pfields[i].type)})
return result, fields return fields
@staticmethod @staticmethod
def fetchBlock(result, fields): def fetchBlock(result, fields):
...@@ -337,8 +337,8 @@ class CTaosInterface(object): ...@@ -337,8 +337,8 @@ class CTaosInterface(object):
result.value = None result.value = None
@staticmethod @staticmethod
def fieldsCount(connection): def fieldsCount(result):
return CTaosInterface.libtaos.taos_field_count(connection) return CTaosInterface.libtaos.taos_field_count(result)
@staticmethod @staticmethod
def fetchFields(result): def fetchFields(result):
...@@ -386,29 +386,30 @@ class CTaosInterface(object): ...@@ -386,29 +386,30 @@ class CTaosInterface(object):
# return (ctypes.cast(data, ctypes.c_char_p).value).rstrip('\x00') # return (ctypes.cast(data, ctypes.c_char_p).value).rstrip('\x00')
@staticmethod @staticmethod
def errno(connection): def errno(result):
"""Return the error number. """Return the error number.
""" """
return CTaosInterface.libtaos.taos_errno(connection) return CTaosInterface.libtaos.taos_errno(result)
@staticmethod @staticmethod
def errStr(connection): def errStr(result):
"""Return the error styring """Return the error styring
""" """
return CTaosInterface.libtaos.taos_errstr(connection) return CTaosInterface.libtaos.taos_errstr(result)
if __name__ == '__main__': if __name__ == '__main__':
cinter = CTaosInterface() cinter = CTaosInterface()
conn = cinter.connect() conn = cinter.connect()
result = cinter.query(conn, 'show databases')
print('Query return value: {}'.format(cinter.query(conn, 'show databases'))) print('Query Affected rows: {}'.format(cinter.affectedRows(result)))
print('Affected rows: {}'.format(cinter.affectedRows(conn)))
result, des = CTaosInterface.useResult(conn) fields = CTaosInterface.useResult(result)
data, num_of_rows = CTaosInterface.fetchBlock(result, des) data, num_of_rows = CTaosInterface.fetchBlock(result, fields)
print(data) print(data)
cinter.freeresult(result)
cinter.close(conn) cinter.close(conn)
\ No newline at end of file
...@@ -78,9 +78,7 @@ class TDengineConnection(object): ...@@ -78,9 +78,7 @@ class TDengineConnection(object):
def clear_result_set(self): def clear_result_set(self):
"""Clear unused result set on this connection. """Clear unused result set on this connection.
""" """
result = self._chandle.useResult(self._conn)[0] pass
if result:
self._chandle.freeResult(result)
if __name__ == "__main__": if __name__ == "__main__":
conn = TDengineConnection(host='192.168.1.107') conn = TDengineConnection(host='192.168.1.107')
......
...@@ -28,6 +28,6 @@ class FieldType(object): ...@@ -28,6 +28,6 @@ class FieldType(object):
C_FLOAT_NULL = float('nan') C_FLOAT_NULL = float('nan')
C_DOUBLE_NULL = float('nan') C_DOUBLE_NULL = float('nan')
C_BINARY_NULL = bytearray([int('0xff', 16)]) C_BINARY_NULL = bytearray([int('0xff', 16)])
# Time precision definition # Timestamp precision definition
C_TIMESTAMP_MILLI = 0 C_TIMESTAMP_MILLI = 0
C_TIMESTAMP_MICRO = 1 C_TIMESTAMP_MICRO = 1
...@@ -116,25 +116,29 @@ class TDengineCursor(object): ...@@ -116,25 +116,29 @@ class TDengineCursor(object):
if params is not None: if params is not None:
pass pass
res = CTaosInterface.query(self._connection._conn, stmt) # global querySeqNum
# querySeqNum += 1
# localSeqNum = querySeqNum # avoid raice condition
# print(" >> Exec Query ({}): {}".format(localSeqNum, str(stmt)))
self._result = CTaosInterface.query(self._connection._conn, stmt)
# print(" << Query ({}) Exec Done".format(localSeqNum))
if (self._logfile): if (self._logfile):
with open(self._logfile, "a") as logfile: with open(self._logfile, "a") as logfile:
logfile.write("%s;\n" % operation) logfile.write("%s;\n" % operation)
if res == 0: if self._result is not None:
if CTaosInterface.fieldsCount(self._connection._conn) == 0: if CTaosInterface.fieldsCount(self._result) == 0:
self._affected_rows += CTaosInterface.affectedRows( self._affected_rows += CTaosInterface.affectedRows(
self._connection._conn) self._result )
return CTaosInterface.affectedRows(self._connection._conn) return CTaosInterface.affectedRows(self._result )
else: else:
self._result, self._fields = CTaosInterface.useResult( self._fields = CTaosInterface.useResult(
self._connection._conn) self._result)
return self._handle_result() return self._handle_result()
else: else:
raise ProgrammingError( raise ProgrammingError(
CTaosInterface.errStr( CTaosInterface.errStr(
self._connection._conn)) self._result ))
def executemany(self, operation, seq_of_parameters): def executemany(self, operation, seq_of_parameters):
"""Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters. """Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters.
......
...@@ -142,12 +142,13 @@ class CTaosInterface(object): ...@@ -142,12 +142,13 @@ class CTaosInterface(object):
libtaos.taos_fetch_fields.restype = ctypes.POINTER(TaosField) libtaos.taos_fetch_fields.restype = ctypes.POINTER(TaosField)
libtaos.taos_init.restype = None libtaos.taos_init.restype = None
libtaos.taos_connect.restype = ctypes.c_void_p libtaos.taos_connect.restype = ctypes.c_void_p
libtaos.taos_use_result.restype = ctypes.c_void_p #libtaos.taos_use_result.restype = ctypes.c_void_p
libtaos.taos_fetch_row.restype = ctypes.POINTER(ctypes.c_void_p) libtaos.taos_fetch_row.restype = ctypes.POINTER(ctypes.c_void_p)
libtaos.taos_errstr.restype = ctypes.c_char_p libtaos.taos_errstr.restype = ctypes.c_char_p
libtaos.taos_subscribe.restype = ctypes.c_void_p libtaos.taos_subscribe.restype = ctypes.c_void_p
libtaos.taos_consume.restype = ctypes.c_void_p libtaos.taos_consume.restype = ctypes.c_void_p
libtaos.taos_fetch_lengths.restype = ctypes.c_void_p libtaos.taos_fetch_lengths.restype = ctypes.c_void_p
libtaos.taos_free_result.restype = None
def __init__(self, config=None): def __init__(self, config=None):
''' '''
...@@ -251,10 +252,10 @@ class CTaosInterface(object): ...@@ -251,10 +252,10 @@ class CTaosInterface(object):
# CTaosInterface.libtaos.close(connection) # CTaosInterface.libtaos.close(connection)
@staticmethod @staticmethod
def affectedRows(connection): def affectedRows(result):
"""The affected rows after runing query """The affected rows after runing query
""" """
return CTaosInterface.libtaos.taos_affected_rows(connection) return CTaosInterface.libtaos.taos_affected_rows(result)
@staticmethod @staticmethod
def subscribe(connection, restart, topic, sql, interval): def subscribe(connection, restart, topic, sql, interval):
...@@ -292,18 +293,17 @@ class CTaosInterface(object): ...@@ -292,18 +293,17 @@ class CTaosInterface(object):
CTaosInterface.libtaos.taos_unsubscribe(sub, 1 if keepProgress else 0) CTaosInterface.libtaos.taos_unsubscribe(sub, 1 if keepProgress else 0)
@staticmethod @staticmethod
def useResult(connection): def useResult(result):
'''Use result after calling self.query '''Use result after calling self.query
''' '''
result = ctypes.c_void_p(CTaosInterface.libtaos.taos_use_result(connection))
fields = [] fields = []
pfields = CTaosInterface.fetchFields(result) pfields = CTaosInterface.fetchFields(result)
for i in range(CTaosInterface.fieldsCount(connection)): for i in range(CTaosInterface.fieldsCount(result)):
fields.append({'name': pfields[i].name.decode('utf-8'), fields.append({'name': pfields[i].name.decode('utf-8'),
'bytes': pfields[i].bytes, 'bytes': pfields[i].bytes,
'type': ord(pfields[i].type)}) 'type': ord(pfields[i].type)})
return result, fields return fields
@staticmethod @staticmethod
def fetchBlock(result, fields): def fetchBlock(result, fields):
...@@ -337,8 +337,8 @@ class CTaosInterface(object): ...@@ -337,8 +337,8 @@ class CTaosInterface(object):
result.value = None result.value = None
@staticmethod @staticmethod
def fieldsCount(connection): def fieldsCount(result):
return CTaosInterface.libtaos.taos_field_count(connection) return CTaosInterface.libtaos.taos_field_count(result)
@staticmethod @staticmethod
def fetchFields(result): def fetchFields(result):
...@@ -386,29 +386,30 @@ class CTaosInterface(object): ...@@ -386,29 +386,30 @@ class CTaosInterface(object):
# return (ctypes.cast(data, ctypes.c_char_p).value).rstrip('\x00') # return (ctypes.cast(data, ctypes.c_char_p).value).rstrip('\x00')
@staticmethod @staticmethod
def errno(connection): def errno(result):
"""Return the error number. """Return the error number.
""" """
return CTaosInterface.libtaos.taos_errno(connection) return CTaosInterface.libtaos.taos_errno(result)
@staticmethod @staticmethod
def errStr(connection): def errStr(result):
"""Return the error styring """Return the error styring
""" """
return CTaosInterface.libtaos.taos_errstr(connection) return CTaosInterface.libtaos.taos_errstr(result)
if __name__ == '__main__': if __name__ == '__main__':
cinter = CTaosInterface() cinter = CTaosInterface()
conn = cinter.connect() conn = cinter.connect()
result = cinter.query(conn, 'show databases')
print('Query return value: {}'.format(cinter.query(conn, 'show databases'))) print('Query Affected rows: {}'.format(cinter.affectedRows(result)))
print('Affected rows: {}'.format(cinter.affectedRows(conn)))
result, des = CTaosInterface.useResult(conn) fields = CTaosInterface.useResult(result)
data, num_of_rows = CTaosInterface.fetchBlock(result, des) data, num_of_rows = CTaosInterface.fetchBlock(result, fields)
print(data) print(data)
cinter.freeresult(result)
cinter.close(conn) cinter.close(conn)
\ No newline at end of file
...@@ -79,9 +79,7 @@ class TDengineConnection(object): ...@@ -79,9 +79,7 @@ class TDengineConnection(object):
def clear_result_set(self): def clear_result_set(self):
"""Clear unused result set on this connection. """Clear unused result set on this connection.
""" """
result = self._chandle.useResult(self._conn)[0] pass
if result:
self._chandle.freeResult(result)
if __name__ == "__main__": if __name__ == "__main__":
conn = TDengineConnection(host='192.168.1.107') conn = TDengineConnection(host='192.168.1.107')
......
...@@ -109,16 +109,16 @@ class TDengineCursor(object): ...@@ -109,16 +109,16 @@ class TDengineCursor(object):
if params is not None: if params is not None:
pass pass
res = CTaosInterface.query(self._connection._conn, stmt) self._result = CTaosInterface.query(self._connection._conn, stmt)
if res == 0: if self._result is not None:
if CTaosInterface.fieldsCount(self._connection._conn) == 0: if CTaosInterface.fieldsCount(self._result) == 0:
self._affected_rows += CTaosInterface.affectedRows(self._connection._conn) self._affected_rows += CTaosInterface.affectedRows(self._result)
return CTaosInterface.affectedRows(self._connection._conn) return CTaosInterface.affectedRows(self._result )
else: else:
self._result, self._fields = CTaosInterface.useResult(self._connection._conn) self._fields = CTaosInterface.useResult(self._result)
return self._handle_result() return self._handle_result()
else: else:
raise ProgrammingError(CTaosInterface.errStr(self._connection._conn)) raise ProgrammingError(CTaosInterface.errStr(self._result))
def executemany(self, operation, seq_of_parameters): def executemany(self, operation, seq_of_parameters):
"""Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters. """Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters.
......
...@@ -142,12 +142,13 @@ class CTaosInterface(object): ...@@ -142,12 +142,13 @@ class CTaosInterface(object):
libtaos.taos_fetch_fields.restype = ctypes.POINTER(TaosField) libtaos.taos_fetch_fields.restype = ctypes.POINTER(TaosField)
libtaos.taos_init.restype = None libtaos.taos_init.restype = None
libtaos.taos_connect.restype = ctypes.c_void_p libtaos.taos_connect.restype = ctypes.c_void_p
libtaos.taos_use_result.restype = ctypes.c_void_p #libtaos.taos_use_result.restype = ctypes.c_void_p
libtaos.taos_fetch_row.restype = ctypes.POINTER(ctypes.c_void_p) libtaos.taos_fetch_row.restype = ctypes.POINTER(ctypes.c_void_p)
libtaos.taos_errstr.restype = ctypes.c_char_p libtaos.taos_errstr.restype = ctypes.c_char_p
libtaos.taos_subscribe.restype = ctypes.c_void_p libtaos.taos_subscribe.restype = ctypes.c_void_p
libtaos.taos_consume.restype = ctypes.c_void_p libtaos.taos_consume.restype = ctypes.c_void_p
libtaos.taos_fetch_lengths.restype = ctypes.c_void_p libtaos.taos_fetch_lengths.restype = ctypes.c_void_p
libtaos.taos_free_result.restype = None
def __init__(self, config=None): def __init__(self, config=None):
''' '''
...@@ -251,10 +252,10 @@ class CTaosInterface(object): ...@@ -251,10 +252,10 @@ class CTaosInterface(object):
# CTaosInterface.libtaos.close(connection) # CTaosInterface.libtaos.close(connection)
@staticmethod @staticmethod
def affectedRows(connection): def affectedRows(result):
"""The affected rows after runing query """The affected rows after runing query
""" """
return CTaosInterface.libtaos.taos_affected_rows(connection) return CTaosInterface.libtaos.taos_affected_rows(result)
@staticmethod @staticmethod
def subscribe(connection, restart, topic, sql, interval): def subscribe(connection, restart, topic, sql, interval):
...@@ -292,18 +293,17 @@ class CTaosInterface(object): ...@@ -292,18 +293,17 @@ class CTaosInterface(object):
CTaosInterface.libtaos.taos_unsubscribe(sub, 1 if keepProgress else 0) CTaosInterface.libtaos.taos_unsubscribe(sub, 1 if keepProgress else 0)
@staticmethod @staticmethod
def useResult(connection): def useResult(result):
'''Use result after calling self.query '''Use result after calling self.query
''' '''
result = ctypes.c_void_p(CTaosInterface.libtaos.taos_use_result(connection))
fields = [] fields = []
pfields = CTaosInterface.fetchFields(result) pfields = CTaosInterface.fetchFields(result)
for i in range(CTaosInterface.fieldsCount(connection)): for i in range(CTaosInterface.fieldsCount(result)):
fields.append({'name': pfields[i].name.decode('utf-8'), fields.append({'name': pfields[i].name.decode('utf-8'),
'bytes': pfields[i].bytes, 'bytes': pfields[i].bytes,
'type': ord(pfields[i].type)}) 'type': ord(pfields[i].type)})
return result, fields return fields
@staticmethod @staticmethod
def fetchBlock(result, fields): def fetchBlock(result, fields):
...@@ -337,8 +337,8 @@ class CTaosInterface(object): ...@@ -337,8 +337,8 @@ class CTaosInterface(object):
result.value = None result.value = None
@staticmethod @staticmethod
def fieldsCount(connection): def fieldsCount(result):
return CTaosInterface.libtaos.taos_field_count(connection) return CTaosInterface.libtaos.taos_field_count(result)
@staticmethod @staticmethod
def fetchFields(result): def fetchFields(result):
...@@ -386,29 +386,30 @@ class CTaosInterface(object): ...@@ -386,29 +386,30 @@ class CTaosInterface(object):
# return (ctypes.cast(data, ctypes.c_char_p).value).rstrip('\x00') # return (ctypes.cast(data, ctypes.c_char_p).value).rstrip('\x00')
@staticmethod @staticmethod
def errno(connection): def errno(result):
"""Return the error number. """Return the error number.
""" """
return CTaosInterface.libtaos.taos_errno(connection) return CTaosInterface.libtaos.taos_errno(result)
@staticmethod @staticmethod
def errStr(connection): def errStr(result):
"""Return the error styring """Return the error styring
""" """
return CTaosInterface.libtaos.taos_errstr(connection).decode('utf-8') return CTaosInterface.libtaos.taos_errstr(result).decode('utf-8')
if __name__ == '__main__': if __name__ == '__main__':
cinter = CTaosInterface() cinter = CTaosInterface()
conn = cinter.connect() conn = cinter.connect()
result = cinter.query(conn, 'show databases')
print('Query return value: {}'.format(cinter.query(conn, 'show databases'))) print('Query Affected rows: {}'.format(cinter.affectedRows(result)))
print('Affected rows: {}'.format(cinter.affectedRows(conn)))
result, des = CTaosInterface.useResult(conn) fields = CTaosInterface.useResult(result)
data, num_of_rows = CTaosInterface.fetchBlock(result, des) data, num_of_rows = CTaosInterface.fetchBlock(result, fields)
print(data) print(data)
cinter.freeresult(result)
cinter.close(conn) cinter.close(conn)
\ No newline at end of file
...@@ -79,9 +79,7 @@ class TDengineConnection(object): ...@@ -79,9 +79,7 @@ class TDengineConnection(object):
def clear_result_set(self): def clear_result_set(self):
"""Clear unused result set on this connection. """Clear unused result set on this connection.
""" """
result = self._chandle.useResult(self._conn)[0] pass
if result:
self._chandle.freeResult(result)
if __name__ == "__main__": if __name__ == "__main__":
conn = TDengineConnection(host='192.168.1.107') conn = TDengineConnection(host='192.168.1.107')
......
...@@ -109,16 +109,16 @@ class TDengineCursor(object): ...@@ -109,16 +109,16 @@ class TDengineCursor(object):
if params is not None: if params is not None:
pass pass
res = CTaosInterface.query(self._connection._conn, stmt) self._result = CTaosInterface.query(self._connection._conn, stmt)
if res == 0: if self._result is not None:
if CTaosInterface.fieldsCount(self._connection._conn) == 0: if CTaosInterface.fieldsCount(self._result) == 0:
self._affected_rows += CTaosInterface.affectedRows(self._connection._conn) self._affected_rows += CTaosInterface.affectedRows(self._result )
return CTaosInterface.affectedRows(self._connection._conn) return CTaosInterface.affectedRows(self._result )
else: else:
self._result, self._fields = CTaosInterface.useResult(self._connection._conn) self._fields = CTaosInterface.useResult(self._result )
return self._handle_result() return self._handle_result()
else: else:
raise ProgrammingError(CTaosInterface.errStr(self._connection._conn)) raise ProgrammingError(CTaosInterface.errStr(self._result ))
def executemany(self, operation, seq_of_parameters): def executemany(self, operation, seq_of_parameters):
"""Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters. """Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters.
......
...@@ -708,25 +708,12 @@ void *readTable(void *sarg) { ...@@ -708,25 +708,12 @@ void *readTable(void *sarg) {
sprintf(command, "select %s from %s%d where ts>= %" PRId64, aggreFunc[j], tb_prefix, i, sTime); sprintf(command, "select %s from %s%d where ts>= %" PRId64, aggreFunc[j], tb_prefix, i, sTime);
double t = getCurrentTime(); double t = getCurrentTime();
<<<<<<< HEAD
/*
if (taos_query(taos, command) != 0) {
fprintf(stderr, "Failed to query\n");
taos_close(taos);
exit(EXIT_FAILURE);
}
*/
TAOS_RES *result = taos_query(taos, command) ;
if (result == NULL) {
fprintf(stderr, "Failed to retreive results:%s\n", taos_errstr(taos));
=======
TAOS_RES *pSql = taos_query(taos, command); TAOS_RES *pSql = taos_query(taos, command);
int32_t code = taos_errno(pSql); int32_t code = taos_errno(pSql);
if (code != 0) { if (code != 0) {
fprintf(stderr, "Failed to query:%s\n", taos_errstr(taos)); fprintf(stderr, "Failed to query:%s\n", taos_errstr(taos));
taos_free_result(pSql); taos_free_result(pSql);
>>>>>>> 2f976c4f62b7a68626350e3ac15eddff20035b59
taos_close(taos); taos_close(taos);
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册