提交 42efe016 编写于 作者: sangshuduo's avatar sangshuduo

[TD-2971] feature: make python connector support unsigned type. fix pep8 format.

上级 8904a046
...@@ -8,6 +8,7 @@ paramstyle = 'pyformat' ...@@ -8,6 +8,7 @@ paramstyle = 'pyformat'
__all__ = ['connection', 'cursor'] __all__ = ['connection', 'cursor']
def connect(*args, **kwargs): def connect(*args, **kwargs):
""" Function to return a TDengine connector object """ Function to return a TDengine connector object
......
...@@ -2,9 +2,11 @@ from .cursor import TDengineCursor ...@@ -2,9 +2,11 @@ from .cursor import TDengineCursor
from .subscription import TDengineSubscription from .subscription import TDengineSubscription
from .cinterface import CTaosInterface from .cinterface import CTaosInterface
class TDengineConnection(object): class TDengineConnection(object):
""" TDengine connection object """ TDengine connection object
""" """
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self._conn = None self._conn = None
self._host = None self._host = None
...@@ -29,7 +31,7 @@ class TDengineConnection(object): ...@@ -29,7 +31,7 @@ class TDengineConnection(object):
# password # password
if 'password' in kwargs: if 'password' in kwargs:
self._password = kwargs['password'] self._password = kwargs['password']
# database # database
if 'database' in kwargs: if 'database' in kwargs:
self._database = kwargs['database'] self._database = kwargs['database']
...@@ -43,7 +45,12 @@ class TDengineConnection(object): ...@@ -43,7 +45,12 @@ class TDengineConnection(object):
self._config = kwargs['config'] self._config = kwargs['config']
self._chandle = CTaosInterface(self._config) self._chandle = CTaosInterface(self._config)
self._conn = self._chandle.connect(self._host, self._user, self._password, self._database, self._port) self._conn = self._chandle.connect(
self._host,
self._user,
self._password,
self._database,
self._port)
def close(self): def close(self):
"""Close current connection. """Close current connection.
...@@ -55,7 +62,8 @@ class TDengineConnection(object): ...@@ -55,7 +62,8 @@ class TDengineConnection(object):
""" """
if self._conn is None: if self._conn is None:
return None return None
sub = CTaosInterface.subscribe(self._conn, restart, topic, sql, interval) sub = CTaosInterface.subscribe(
self._conn, restart, topic, sql, interval)
return TDengineSubscription(sub) return TDengineSubscription(sub)
def cursor(self): def cursor(self):
...@@ -80,7 +88,8 @@ class TDengineConnection(object): ...@@ -80,7 +88,8 @@ class TDengineConnection(object):
""" """
pass pass
if __name__ == "__main__": if __name__ == "__main__":
conn = TDengineConnection(host='192.168.1.107') conn = TDengineConnection(host='192.168.1.107')
conn.close() conn.close()
print("Hello world") print("Hello world")
\ No newline at end of file
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
from .dbapi import * from .dbapi import *
class FieldType(object): class FieldType(object):
"""TDengine Field Types """TDengine Field Types
""" """
......
...@@ -128,8 +128,8 @@ class TDengineCursor(object): ...@@ -128,8 +128,8 @@ class TDengineCursor(object):
if errno == 0: if errno == 0:
if CTaosInterface.fieldsCount(self._result) == 0: if CTaosInterface.fieldsCount(self._result) == 0:
self._affected_rows += CTaosInterface.affectedRows( self._affected_rows += CTaosInterface.affectedRows(
self._result ) self._result)
return CTaosInterface.affectedRows(self._result ) return CTaosInterface.affectedRows(self._result)
else: else:
self._fields = CTaosInterface.useResult( self._fields = CTaosInterface.useResult(
self._result) self._result)
...@@ -148,6 +148,7 @@ class TDengineCursor(object): ...@@ -148,6 +148,7 @@ class TDengineCursor(object):
"""Fetch the next row of a query result set, returning a single sequence, or None when no more data is available. """Fetch the next row of a query result set, returning a single sequence, or None when no more data is available.
""" """
pass pass
def fetchmany(self): def fetchmany(self):
pass pass
...@@ -206,16 +207,20 @@ class TDengineCursor(object): ...@@ -206,16 +207,20 @@ class TDengineCursor(object):
buffer = [[] for i in range(len(self._fields))] buffer = [[] for i in range(len(self._fields))]
self._rowcount = 0 self._rowcount = 0
while True: while True:
block, num_of_fields = CTaosInterface.fetchRow(self._result, self._fields) block, num_of_fields = CTaosInterface.fetchRow(
self._result, self._fields)
errno = CTaosInterface.libtaos.taos_errno(self._result) errno = CTaosInterface.libtaos.taos_errno(self._result)
if errno != 0: if errno != 0:
raise ProgrammingError(CTaosInterface.errStr(self._result), errno) raise ProgrammingError(
CTaosInterface.errStr(
self._result), errno)
if num_of_fields == 0: if num_of_fields == 0:
break break
self._rowcount += num_of_fields self._rowcount += num_of_fields
for i in range(len(self._fields)): for i in range(len(self._fields)):
buffer[i].extend(block[i]) buffer[i].extend(block[i])
return list(map(tuple, zip(*buffer))) return list(map(tuple, zip(*buffer)))
def fetchall(self): def fetchall(self):
if self._result is None or self._fields is None: if self._result is None or self._fields is None:
raise OperationalError("Invalid use of fetchall") raise OperationalError("Invalid use of fetchall")
...@@ -223,16 +228,20 @@ class TDengineCursor(object): ...@@ -223,16 +228,20 @@ class TDengineCursor(object):
buffer = [[] for i in range(len(self._fields))] buffer = [[] for i in range(len(self._fields))]
self._rowcount = 0 self._rowcount = 0
while True: while True:
block, num_of_fields = CTaosInterface.fetchBlock(self._result, self._fields) block, num_of_fields = CTaosInterface.fetchBlock(
self._result, self._fields)
errno = CTaosInterface.libtaos.taos_errno(self._result) errno = CTaosInterface.libtaos.taos_errno(self._result)
if errno != 0: if errno != 0:
raise ProgrammingError(CTaosInterface.errStr(self._result), errno) raise ProgrammingError(
CTaosInterface.errStr(
self._result), errno)
if num_of_fields == 0: if num_of_fields == 0:
break break
self._rowcount += num_of_fields self._rowcount += num_of_fields
for i in range(len(self._fields)): for i in range(len(self._fields)):
buffer[i].extend(block[i]) buffer[i].extend(block[i])
return list(map(tuple, zip(*buffer))) return list(map(tuple, zip(*buffer)))
def nextset(self): def nextset(self):
""" """
""" """
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
import time import time
import datetime import datetime
class DBAPITypeObject(object): class DBAPITypeObject(object):
def __init__(self, *values): def __init__(self, *values):
self.values = values self.values = values
...@@ -16,23 +17,28 @@ class DBAPITypeObject(object): ...@@ -16,23 +17,28 @@ class DBAPITypeObject(object):
else: else:
return -1 return -1
Date = datetime.date Date = datetime.date
Time = datetime.time Time = datetime.time
Timestamp = datetime.datetime Timestamp = datetime.datetime
def DataFromTicks(ticks): def DataFromTicks(ticks):
return Date(*time.localtime(ticks)[:3]) return Date(*time.localtime(ticks)[:3])
def TimeFromTicks(ticks): def TimeFromTicks(ticks):
return Time(*time.localtime(ticks)[3:6]) return Time(*time.localtime(ticks)[3:6])
def TimestampFromTicks(ticks): def TimestampFromTicks(ticks):
return Timestamp(*time.localtime(ticks)[:6]) return Timestamp(*time.localtime(ticks)[:6])
Binary = bytes Binary = bytes
# STRING = DBAPITypeObject(*constants.FieldType.get_string_types()) # STRING = DBAPITypeObject(*constants.FieldType.get_string_types())
# BINARY = DBAPITypeObject(*constants.FieldType.get_binary_types()) # BINARY = DBAPITypeObject(*constants.FieldType.get_binary_types())
# NUMBER = BAPITypeObject(*constants.FieldType.get_number_types()) # NUMBER = BAPITypeObject(*constants.FieldType.get_number_types())
# DATETIME = DBAPITypeObject(*constants.FieldType.get_timestamp_types()) # DATETIME = DBAPITypeObject(*constants.FieldType.get_timestamp_types())
# ROWID = DBAPITypeObject() # ROWID = DBAPITypeObject()
\ No newline at end of file
"""Python exceptions """Python exceptions
""" """
class Error(Exception): class Error(Exception):
def __init__(self, msg=None, errno=None): def __init__(self, msg=None, errno=None):
self.msg = msg self.msg = msg
self._full_msg = self.msg self._full_msg = self.msg
self.errno = errno self.errno = errno
def __str__(self): def __str__(self):
return self._full_msg return self._full_msg
class Warning(Exception): class Warning(Exception):
"""Exception raised for important warnings like data truncations while inserting. """Exception raised for important warnings like data truncations while inserting.
""" """
pass pass
class InterfaceError(Error): class InterfaceError(Error):
"""Exception raised for errors that are related to the database interface rather than the database itself. """Exception raised for errors that are related to the database interface rather than the database itself.
""" """
pass pass
class DatabaseError(Error): class DatabaseError(Error):
"""Exception raised for errors that are related to the database. """Exception raised for errors that are related to the database.
""" """
pass pass
class DataError(DatabaseError): class DataError(DatabaseError):
"""Exception raised for errors that are due to problems with the processed data like division by zero, numeric value out of range. """Exception raised for errors that are due to problems with the processed data like division by zero, numeric value out of range.
""" """
pass pass
class OperationalError(DatabaseError): class OperationalError(DatabaseError):
"""Exception raised for errors that are related to the database's operation and not necessarily under the control of the programmer """Exception raised for errors that are related to the database's operation and not necessarily under the control of the programmer
""" """
...@@ -41,17 +47,20 @@ class IntegrityError(DatabaseError): ...@@ -41,17 +47,20 @@ class IntegrityError(DatabaseError):
""" """
pass pass
class InternalError(DatabaseError): class InternalError(DatabaseError):
"""Exception raised when the database encounters an internal error. """Exception raised when the database encounters an internal error.
""" """
pass pass
class ProgrammingError(DatabaseError): class ProgrammingError(DatabaseError):
"""Exception raised for programming errors. """Exception raised for programming errors.
""" """
pass pass
class NotSupportedError(DatabaseError): class NotSupportedError(DatabaseError):
"""Exception raised in case a method or database API was used which is not supported by the database,. """Exception raised in case a method or database API was used which is not supported by the database,.
""" """
pass pass
\ No newline at end of file
from .cinterface import CTaosInterface from .cinterface import CTaosInterface
from .error import * from .error import *
class TDengineSubscription(object): class TDengineSubscription(object):
"""TDengine subscription object """TDengine subscription object
""" """
def __init__(self, sub): def __init__(self, sub):
self._sub = sub self._sub = sub
def consume(self): def consume(self):
"""Consume rows of a subscription """Consume rows of a subscription
""" """
if self._sub is None: if self._sub is None:
raise OperationalError("Invalid use of consume") raise OperationalError("Invalid use of consume")
result, fields = CTaosInterface.consume(self._sub) result, fields = CTaosInterface.consume(self._sub)
buffer = [[] for i in range(len(fields))] buffer = [[] for i in range(len(fields))]
while True: while True:
block, num_of_fields = CTaosInterface.fetchBlock(result, fields) block, num_of_fields = CTaosInterface.fetchBlock(result, fields)
if num_of_fields == 0: break if num_of_fields == 0:
break
for i in range(len(fields)): for i in range(len(fields)):
buffer[i].extend(block[i]) buffer[i].extend(block[i])
self.fields = fields self.fields = fields
return list(map(tuple, zip(*buffer))) return list(map(tuple, zip(*buffer)))
def close(self, keepProgress=True):
def close(self, keepProgress = True):
"""Close the Subscription. """Close the Subscription.
""" """
if self._sub is None: if self._sub is None:
return False return False
CTaosInterface.unsubscribe(self._sub, keepProgress) CTaosInterface.unsubscribe(self._sub, keepProgress)
return True return True
if __name__ == '__main__': if __name__ == '__main__':
from .connection import TDengineConnection from .connection import TDengineConnection
conn = TDengineConnection(host="127.0.0.1", user="root", password="taosdata", database="test") conn = TDengineConnection(
host="127.0.0.1",
user="root",
password="taosdata",
database="test")
# Generate a cursor object to run SQL commands # Generate a cursor object to run SQL commands
sub = conn.subscribe(True, "test", "select * from meters;", 1000) sub = conn.subscribe(True, "test", "select * from meters;", 1000)
for i in range(0,10): for i in range(0, 10):
data = sub.consume() data = sub.consume()
for d in data: for d in data:
print(d) print(d)
sub.close() sub.close()
conn.close() conn.close()
\ No newline at end of file
...@@ -8,6 +8,7 @@ paramstyle = 'pyformat' ...@@ -8,6 +8,7 @@ paramstyle = 'pyformat'
__all__ = ['connection', 'cursor'] __all__ = ['connection', 'cursor']
def connect(*args, **kwargs): def connect(*args, **kwargs):
""" Function to return a TDengine connector object """ Function to return a TDengine connector object
......
...@@ -2,9 +2,11 @@ from .cursor import TDengineCursor ...@@ -2,9 +2,11 @@ from .cursor import TDengineCursor
from .subscription import TDengineSubscription from .subscription import TDengineSubscription
from .cinterface import CTaosInterface from .cinterface import CTaosInterface
class TDengineConnection(object): class TDengineConnection(object):
""" TDengine connection object """ TDengine connection object
""" """
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self._conn = None self._conn = None
self._host = None self._host = None
...@@ -29,7 +31,7 @@ class TDengineConnection(object): ...@@ -29,7 +31,7 @@ class TDengineConnection(object):
# password # password
if 'password' in kwargs: if 'password' in kwargs:
self._password = kwargs['password'] self._password = kwargs['password']
# database # database
if 'database' in kwargs: if 'database' in kwargs:
self._database = kwargs['database'] self._database = kwargs['database']
...@@ -43,7 +45,12 @@ class TDengineConnection(object): ...@@ -43,7 +45,12 @@ class TDengineConnection(object):
self._config = kwargs['config'] self._config = kwargs['config']
self._chandle = CTaosInterface(self._config) self._chandle = CTaosInterface(self._config)
self._conn = self._chandle.connect(self._host, self._user, self._password, self._database, self._port) self._conn = self._chandle.connect(
self._host,
self._user,
self._password,
self._database,
self._port)
def close(self): def close(self):
"""Close current connection. """Close current connection.
...@@ -55,7 +62,8 @@ class TDengineConnection(object): ...@@ -55,7 +62,8 @@ class TDengineConnection(object):
""" """
if self._conn is None: if self._conn is None:
return None return None
sub = CTaosInterface.subscribe(self._conn, restart, topic, sql, interval) sub = CTaosInterface.subscribe(
self._conn, restart, topic, sql, interval)
return TDengineSubscription(sub) return TDengineSubscription(sub)
def cursor(self): def cursor(self):
...@@ -80,7 +88,8 @@ class TDengineConnection(object): ...@@ -80,7 +88,8 @@ class TDengineConnection(object):
""" """
pass pass
if __name__ == "__main__": if __name__ == "__main__":
conn = TDengineConnection(host='192.168.1.107') conn = TDengineConnection(host='192.168.1.107')
conn.close() conn.close()
print("Hello world") print("Hello world")
\ No newline at end of file
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
from .dbapi import * from .dbapi import *
class FieldType(object): class FieldType(object):
"""TDengine Field Types """TDengine Field Types
""" """
......
...@@ -5,6 +5,7 @@ import threading ...@@ -5,6 +5,7 @@ import threading
# querySeqNum = 0 # querySeqNum = 0
class TDengineCursor(object): class TDengineCursor(object):
"""Database cursor which is used to manage the context of a fetch operation. """Database cursor which is used to manage the context of a fetch operation.
...@@ -107,8 +108,8 @@ class TDengineCursor(object): ...@@ -107,8 +108,8 @@ class TDengineCursor(object):
# if threading.get_ident() != self._threadId: # if threading.get_ident() != self._threadId:
# info ="Cursor execute:Thread ID not match,creater:"+str(self._threadId)+" caller:"+str(threading.get_ident()) # info ="Cursor execute:Thread ID not match,creater:"+str(self._threadId)+" caller:"+str(threading.get_ident())
# raise OperationalError(info) # raise OperationalError(info)
# print(info) # print(info)
# return None # return None
if not operation: if not operation:
return None return None
...@@ -137,8 +138,8 @@ class TDengineCursor(object): ...@@ -137,8 +138,8 @@ class TDengineCursor(object):
if errno == 0: if errno == 0:
if CTaosInterface.fieldsCount(self._result) == 0: if CTaosInterface.fieldsCount(self._result) == 0:
self._affected_rows += CTaosInterface.affectedRows( self._affected_rows += CTaosInterface.affectedRows(
self._result ) self._result)
return CTaosInterface.affectedRows(self._result ) return CTaosInterface.affectedRows(self._result)
else: else:
self._fields = CTaosInterface.useResult( self._fields = CTaosInterface.useResult(
self._result) self._result)
...@@ -216,10 +217,13 @@ class TDengineCursor(object): ...@@ -216,10 +217,13 @@ class TDengineCursor(object):
buffer = [[] for i in range(len(self._fields))] buffer = [[] for i in range(len(self._fields))]
self._rowcount = 0 self._rowcount = 0
while True: while True:
block, num_of_fields = CTaosInterface.fetchRow(self._result, self._fields) block, num_of_fields = CTaosInterface.fetchRow(
self._result, self._fields)
errno = CTaosInterface.libtaos.taos_errno(self._result) errno = CTaosInterface.libtaos.taos_errno(self._result)
if errno != 0: if errno != 0:
raise ProgrammingError(CTaosInterface.errStr(self._result), errno) raise ProgrammingError(
CTaosInterface.errStr(
self._result), errno)
if num_of_fields == 0: if num_of_fields == 0:
break break
self._rowcount += num_of_fields self._rowcount += num_of_fields
...@@ -234,15 +238,20 @@ class TDengineCursor(object): ...@@ -234,15 +238,20 @@ class TDengineCursor(object):
buffer = [[] for i in range(len(self._fields))] buffer = [[] for i in range(len(self._fields))]
self._rowcount = 0 self._rowcount = 0
while True: while True:
block, num_of_fields = CTaosInterface.fetchBlock(self._result, self._fields) block, num_of_fields = CTaosInterface.fetchBlock(
self._result, self._fields)
errno = CTaosInterface.libtaos.taos_errno(self._result) errno = CTaosInterface.libtaos.taos_errno(self._result)
if errno != 0: if errno != 0:
raise ProgrammingError(CTaosInterface.errStr(self._result), errno) raise ProgrammingError(
if num_of_fields == 0: break CTaosInterface.errStr(
self._result), errno)
if num_of_fields == 0:
break
self._rowcount += num_of_fields self._rowcount += num_of_fields
for i in range(len(self._fields)): for i in range(len(self._fields)):
buffer[i].extend(block[i]) buffer[i].extend(block[i])
return list(map(tuple, zip(*buffer))) return list(map(tuple, zip(*buffer)))
def nextset(self): def nextset(self):
""" """
""" """
...@@ -274,8 +283,8 @@ class TDengineCursor(object): ...@@ -274,8 +283,8 @@ class TDengineCursor(object):
# if threading.get_ident() != self._threadId: # if threading.get_ident() != self._threadId:
# info = "Cursor handleresult:Thread ID not match,creater:"+str(self._threadId)+" caller:"+str(threading.get_ident()) # info = "Cursor handleresult:Thread ID not match,creater:"+str(self._threadId)+" caller:"+str(threading.get_ident())
# raise OperationalError(info) # raise OperationalError(info)
# print(info) # print(info)
# return None # return None
self._description = [] self._description = []
for ele in self._fields: for ele in self._fields:
...@@ -283,4 +292,3 @@ class TDengineCursor(object): ...@@ -283,4 +292,3 @@ class TDengineCursor(object):
(ele['name'], ele['type'], None, None, None, None, False)) (ele['name'], ele['type'], None, None, None, None, False))
return self._result return self._result
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
import time import time
import datetime import datetime
class DBAPITypeObject(object): class DBAPITypeObject(object):
def __init__(self, *values): def __init__(self, *values):
self.values = values self.values = values
...@@ -16,23 +17,28 @@ class DBAPITypeObject(object): ...@@ -16,23 +17,28 @@ class DBAPITypeObject(object):
else: else:
return -1 return -1
Date = datetime.date Date = datetime.date
Time = datetime.time Time = datetime.time
Timestamp = datetime.datetime Timestamp = datetime.datetime
def DataFromTicks(ticks): def DataFromTicks(ticks):
return Date(*time.localtime(ticks)[:3]) return Date(*time.localtime(ticks)[:3])
def TimeFromTicks(ticks): def TimeFromTicks(ticks):
return Time(*time.localtime(ticks)[3:6]) return Time(*time.localtime(ticks)[3:6])
def TimestampFromTicks(ticks): def TimestampFromTicks(ticks):
return Timestamp(*time.localtime(ticks)[:6]) return Timestamp(*time.localtime(ticks)[:6])
Binary = bytes Binary = bytes
# STRING = DBAPITypeObject(*constants.FieldType.get_string_types()) # STRING = DBAPITypeObject(*constants.FieldType.get_string_types())
# BINARY = DBAPITypeObject(*constants.FieldType.get_binary_types()) # BINARY = DBAPITypeObject(*constants.FieldType.get_binary_types())
# NUMBER = BAPITypeObject(*constants.FieldType.get_number_types()) # NUMBER = BAPITypeObject(*constants.FieldType.get_number_types())
# DATETIME = DBAPITypeObject(*constants.FieldType.get_timestamp_types()) # DATETIME = DBAPITypeObject(*constants.FieldType.get_timestamp_types())
# ROWID = DBAPITypeObject() # ROWID = DBAPITypeObject()
\ No newline at end of file
"""Python exceptions """Python exceptions
""" """
class Error(Exception): class Error(Exception):
def __init__(self, msg=None, errno=None): def __init__(self, msg=None, errno=None):
self.msg = msg self.msg = msg
self._full_msg = self.msg self._full_msg = self.msg
self.errno = errno self.errno = errno
def __str__(self): def __str__(self):
return self._full_msg return self._full_msg
class Warning(Exception): class Warning(Exception):
"""Exception raised for important warnings like data truncations while inserting. """Exception raised for important warnings like data truncations while inserting.
""" """
pass pass
class InterfaceError(Error): class InterfaceError(Error):
"""Exception raised for errors that are related to the database interface rather than the database itself. """Exception raised for errors that are related to the database interface rather than the database itself.
""" """
pass pass
class DatabaseError(Error): class DatabaseError(Error):
"""Exception raised for errors that are related to the database. """Exception raised for errors that are related to the database.
""" """
pass pass
class DataError(DatabaseError): class DataError(DatabaseError):
"""Exception raised for errors that are due to problems with the processed data like division by zero, numeric value out of range. """Exception raised for errors that are due to problems with the processed data like division by zero, numeric value out of range.
""" """
pass pass
class OperationalError(DatabaseError): class OperationalError(DatabaseError):
"""Exception raised for errors that are related to the database's operation and not necessarily under the control of the programmer """Exception raised for errors that are related to the database's operation and not necessarily under the control of the programmer
""" """
...@@ -41,17 +47,20 @@ class IntegrityError(DatabaseError): ...@@ -41,17 +47,20 @@ class IntegrityError(DatabaseError):
""" """
pass pass
class InternalError(DatabaseError): class InternalError(DatabaseError):
"""Exception raised when the database encounters an internal error. """Exception raised when the database encounters an internal error.
""" """
pass pass
class ProgrammingError(DatabaseError): class ProgrammingError(DatabaseError):
"""Exception raised for programming errors. """Exception raised for programming errors.
""" """
pass pass
class NotSupportedError(DatabaseError): class NotSupportedError(DatabaseError):
"""Exception raised in case a method or database API was used which is not supported by the database,. """Exception raised in case a method or database API was used which is not supported by the database,.
""" """
pass pass
\ No newline at end of file
from .cinterface import CTaosInterface from .cinterface import CTaosInterface
from .error import * from .error import *
class TDengineSubscription(object): class TDengineSubscription(object):
"""TDengine subscription object """TDengine subscription object
""" """
def __init__(self, sub): def __init__(self, sub):
self._sub = sub self._sub = sub
def consume(self): def consume(self):
"""Consume rows of a subscription """Consume rows of a subscription
""" """
if self._sub is None: if self._sub is None:
raise OperationalError("Invalid use of consume") raise OperationalError("Invalid use of consume")
result, fields = CTaosInterface.consume(self._sub) result, fields = CTaosInterface.consume(self._sub)
buffer = [[] for i in range(len(fields))] buffer = [[] for i in range(len(fields))]
while True: while True:
block, num_of_fields = CTaosInterface.fetchBlock(result, fields) block, num_of_fields = CTaosInterface.fetchBlock(result, fields)
if num_of_fields == 0: break if num_of_fields == 0:
break
for i in range(len(fields)): for i in range(len(fields)):
buffer[i].extend(block[i]) buffer[i].extend(block[i])
self.fields = fields self.fields = fields
return list(map(tuple, zip(*buffer))) return list(map(tuple, zip(*buffer)))
def close(self, keepProgress=True):
def close(self, keepProgress = True):
"""Close the Subscription. """Close the Subscription.
""" """
if self._sub is None: if self._sub is None:
...@@ -38,15 +39,19 @@ class TDengineSubscription(object): ...@@ -38,15 +39,19 @@ class TDengineSubscription(object):
if __name__ == '__main__': if __name__ == '__main__':
from .connection import TDengineConnection from .connection import TDengineConnection
conn = TDengineConnection(host="127.0.0.1", user="root", password="taosdata", database="test") conn = TDengineConnection(
host="127.0.0.1",
user="root",
password="taosdata",
database="test")
# Generate a cursor object to run SQL commands # Generate a cursor object to run SQL commands
sub = conn.subscribe(True, "test", "select * from meters;", 1000) sub = conn.subscribe(True, "test", "select * from meters;", 1000)
for i in range(0,10): for i in range(0, 10):
data = sub.consume() data = sub.consume()
for d in data: for d in data:
print(d) print(d)
sub.close() sub.close()
conn.close() conn.close()
\ No newline at end of file
...@@ -8,6 +8,7 @@ paramstyle = 'pyformat' ...@@ -8,6 +8,7 @@ paramstyle = 'pyformat'
__all__ = ['connection', 'cursor'] __all__ = ['connection', 'cursor']
def connect(*args, **kwargs): def connect(*args, **kwargs):
""" Function to return a TDengine connector object """ Function to return a TDengine connector object
......
...@@ -2,9 +2,11 @@ from .cursor import TDengineCursor ...@@ -2,9 +2,11 @@ from .cursor import TDengineCursor
from .subscription import TDengineSubscription from .subscription import TDengineSubscription
from .cinterface import CTaosInterface from .cinterface import CTaosInterface
class TDengineConnection(object): class TDengineConnection(object):
""" TDengine connection object """ TDengine connection object
""" """
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self._conn = None self._conn = None
self._host = None self._host = None
...@@ -29,7 +31,7 @@ class TDengineConnection(object): ...@@ -29,7 +31,7 @@ class TDengineConnection(object):
# password # password
if 'password' in kwargs: if 'password' in kwargs:
self._password = kwargs['password'] self._password = kwargs['password']
# database # database
if 'database' in kwargs: if 'database' in kwargs:
self._database = kwargs['database'] self._database = kwargs['database']
...@@ -43,7 +45,12 @@ class TDengineConnection(object): ...@@ -43,7 +45,12 @@ class TDengineConnection(object):
self._config = kwargs['config'] self._config = kwargs['config']
self._chandle = CTaosInterface(self._config) self._chandle = CTaosInterface(self._config)
self._conn = self._chandle.connect(self._host, self._user, self._password, self._database, self._port) self._conn = self._chandle.connect(
self._host,
self._user,
self._password,
self._database,
self._port)
def close(self): def close(self):
"""Close current connection. """Close current connection.
...@@ -55,7 +62,8 @@ class TDengineConnection(object): ...@@ -55,7 +62,8 @@ class TDengineConnection(object):
""" """
if self._conn is None: if self._conn is None:
return None return None
sub = CTaosInterface.subscribe(self._conn, restart, topic, sql, interval) sub = CTaosInterface.subscribe(
self._conn, restart, topic, sql, interval)
return TDengineSubscription(sub) return TDengineSubscription(sub)
def cursor(self): def cursor(self):
...@@ -80,7 +88,8 @@ class TDengineConnection(object): ...@@ -80,7 +88,8 @@ class TDengineConnection(object):
""" """
pass pass
if __name__ == "__main__": if __name__ == "__main__":
conn = TDengineConnection(host='192.168.1.107') conn = TDengineConnection(host='192.168.1.107')
conn.close() conn.close()
print("Hello world") print("Hello world")
\ No newline at end of file
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
from .dbapi import * from .dbapi import *
class FieldType(object): class FieldType(object):
"""TDengine Field Types """TDengine Field Types
""" """
......
...@@ -5,6 +5,7 @@ import threading ...@@ -5,6 +5,7 @@ import threading
# querySeqNum = 0 # querySeqNum = 0
class TDengineCursor(object): class TDengineCursor(object):
"""Database cursor which is used to manage the context of a fetch operation. """Database cursor which is used to manage the context of a fetch operation.
...@@ -107,8 +108,8 @@ class TDengineCursor(object): ...@@ -107,8 +108,8 @@ class TDengineCursor(object):
# if threading.get_ident() != self._threadId: # if threading.get_ident() != self._threadId:
# info ="Cursor execute:Thread ID not match,creater:"+str(self._threadId)+" caller:"+str(threading.get_ident()) # info ="Cursor execute:Thread ID not match,creater:"+str(self._threadId)+" caller:"+str(threading.get_ident())
# raise OperationalError(info) # raise OperationalError(info)
# print(info) # print(info)
# return None # return None
if not operation: if not operation:
return None return None
...@@ -137,8 +138,8 @@ class TDengineCursor(object): ...@@ -137,8 +138,8 @@ class TDengineCursor(object):
if errno == 0: if errno == 0:
if CTaosInterface.fieldsCount(self._result) == 0: if CTaosInterface.fieldsCount(self._result) == 0:
self._affected_rows += CTaosInterface.affectedRows( self._affected_rows += CTaosInterface.affectedRows(
self._result ) self._result)
return CTaosInterface.affectedRows(self._result ) return CTaosInterface.affectedRows(self._result)
else: else:
self._fields = CTaosInterface.useResult( self._fields = CTaosInterface.useResult(
self._result) self._result)
...@@ -216,10 +217,13 @@ class TDengineCursor(object): ...@@ -216,10 +217,13 @@ class TDengineCursor(object):
buffer = [[] for i in range(len(self._fields))] buffer = [[] for i in range(len(self._fields))]
self._rowcount = 0 self._rowcount = 0
while True: while True:
block, num_of_fields = CTaosInterface.fetchRow(self._result, self._fields) block, num_of_fields = CTaosInterface.fetchRow(
self._result, self._fields)
errno = CTaosInterface.libtaos.taos_errno(self._result) errno = CTaosInterface.libtaos.taos_errno(self._result)
if errno != 0: if errno != 0:
raise ProgrammingError(CTaosInterface.errStr(self._result), errno) raise ProgrammingError(
CTaosInterface.errStr(
self._result), errno)
if num_of_fields == 0: if num_of_fields == 0:
break break
self._rowcount += num_of_fields self._rowcount += num_of_fields
...@@ -234,15 +238,20 @@ class TDengineCursor(object): ...@@ -234,15 +238,20 @@ class TDengineCursor(object):
buffer = [[] for i in range(len(self._fields))] buffer = [[] for i in range(len(self._fields))]
self._rowcount = 0 self._rowcount = 0
while True: while True:
block, num_of_fields = CTaosInterface.fetchBlock(self._result, self._fields) block, num_of_fields = CTaosInterface.fetchBlock(
self._result, self._fields)
errno = CTaosInterface.libtaos.taos_errno(self._result) errno = CTaosInterface.libtaos.taos_errno(self._result)
if errno != 0: if errno != 0:
raise ProgrammingError(CTaosInterface.errStr(self._result), errno) raise ProgrammingError(
if num_of_fields == 0: break CTaosInterface.errStr(
self._result), errno)
if num_of_fields == 0:
break
self._rowcount += num_of_fields self._rowcount += num_of_fields
for i in range(len(self._fields)): for i in range(len(self._fields)):
buffer[i].extend(block[i]) buffer[i].extend(block[i])
return list(map(tuple, zip(*buffer))) return list(map(tuple, zip(*buffer)))
def nextset(self): def nextset(self):
""" """
""" """
...@@ -274,8 +283,8 @@ class TDengineCursor(object): ...@@ -274,8 +283,8 @@ class TDengineCursor(object):
# if threading.get_ident() != self._threadId: # if threading.get_ident() != self._threadId:
# info = "Cursor handleresult:Thread ID not match,creater:"+str(self._threadId)+" caller:"+str(threading.get_ident()) # info = "Cursor handleresult:Thread ID not match,creater:"+str(self._threadId)+" caller:"+str(threading.get_ident())
# raise OperationalError(info) # raise OperationalError(info)
# print(info) # print(info)
# return None # return None
self._description = [] self._description = []
for ele in self._fields: for ele in self._fields:
...@@ -283,4 +292,3 @@ class TDengineCursor(object): ...@@ -283,4 +292,3 @@ class TDengineCursor(object):
(ele['name'], ele['type'], None, None, None, None, False)) (ele['name'], ele['type'], None, None, None, None, False))
return self._result return self._result
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
import time import time
import datetime import datetime
class DBAPITypeObject(object): class DBAPITypeObject(object):
def __init__(self, *values): def __init__(self, *values):
self.values = values self.values = values
...@@ -16,23 +17,28 @@ class DBAPITypeObject(object): ...@@ -16,23 +17,28 @@ class DBAPITypeObject(object):
else: else:
return -1 return -1
Date = datetime.date Date = datetime.date
Time = datetime.time Time = datetime.time
Timestamp = datetime.datetime Timestamp = datetime.datetime
def DataFromTicks(ticks): def DataFromTicks(ticks):
return Date(*time.localtime(ticks)[:3]) return Date(*time.localtime(ticks)[:3])
def TimeFromTicks(ticks): def TimeFromTicks(ticks):
return Time(*time.localtime(ticks)[3:6]) return Time(*time.localtime(ticks)[3:6])
def TimestampFromTicks(ticks): def TimestampFromTicks(ticks):
return Timestamp(*time.localtime(ticks)[:6]) return Timestamp(*time.localtime(ticks)[:6])
Binary = bytes Binary = bytes
# STRING = DBAPITypeObject(*constants.FieldType.get_string_types()) # STRING = DBAPITypeObject(*constants.FieldType.get_string_types())
# BINARY = DBAPITypeObject(*constants.FieldType.get_binary_types()) # BINARY = DBAPITypeObject(*constants.FieldType.get_binary_types())
# NUMBER = BAPITypeObject(*constants.FieldType.get_number_types()) # NUMBER = BAPITypeObject(*constants.FieldType.get_number_types())
# DATETIME = DBAPITypeObject(*constants.FieldType.get_timestamp_types()) # DATETIME = DBAPITypeObject(*constants.FieldType.get_timestamp_types())
# ROWID = DBAPITypeObject() # ROWID = DBAPITypeObject()
\ No newline at end of file
"""Python exceptions """Python exceptions
""" """
class Error(Exception): class Error(Exception):
def __init__(self, msg=None, errno=None): def __init__(self, msg=None, errno=None):
self.msg = msg self.msg = msg
self._full_msg = self.msg self._full_msg = self.msg
self.errno = errno self.errno = errno
def __str__(self): def __str__(self):
return self._full_msg return self._full_msg
class Warning(Exception): class Warning(Exception):
"""Exception raised for important warnings like data truncations while inserting. """Exception raised for important warnings like data truncations while inserting.
""" """
pass pass
class InterfaceError(Error): class InterfaceError(Error):
"""Exception raised for errors that are related to the database interface rather than the database itself. """Exception raised for errors that are related to the database interface rather than the database itself.
""" """
pass pass
class DatabaseError(Error): class DatabaseError(Error):
"""Exception raised for errors that are related to the database. """Exception raised for errors that are related to the database.
""" """
pass pass
class DataError(DatabaseError): class DataError(DatabaseError):
"""Exception raised for errors that are due to problems with the processed data like division by zero, numeric value out of range. """Exception raised for errors that are due to problems with the processed data like division by zero, numeric value out of range.
""" """
pass pass
class OperationalError(DatabaseError): class OperationalError(DatabaseError):
"""Exception raised for errors that are related to the database's operation and not necessarily under the control of the programmer """Exception raised for errors that are related to the database's operation and not necessarily under the control of the programmer
""" """
...@@ -41,17 +47,20 @@ class IntegrityError(DatabaseError): ...@@ -41,17 +47,20 @@ class IntegrityError(DatabaseError):
""" """
pass pass
class InternalError(DatabaseError): class InternalError(DatabaseError):
"""Exception raised when the database encounters an internal error. """Exception raised when the database encounters an internal error.
""" """
pass pass
class ProgrammingError(DatabaseError): class ProgrammingError(DatabaseError):
"""Exception raised for programming errors. """Exception raised for programming errors.
""" """
pass pass
class NotSupportedError(DatabaseError): class NotSupportedError(DatabaseError):
"""Exception raised in case a method or database API was used which is not supported by the database,. """Exception raised in case a method or database API was used which is not supported by the database,.
""" """
pass pass
\ No newline at end of file
from .cinterface import CTaosInterface from .cinterface import CTaosInterface
from .error import * from .error import *
class TDengineSubscription(object): class TDengineSubscription(object):
"""TDengine subscription object """TDengine subscription object
""" """
def __init__(self, sub): def __init__(self, sub):
self._sub = sub self._sub = sub
def consume(self): def consume(self):
"""Consume rows of a subscription """Consume rows of a subscription
""" """
if self._sub is None: if self._sub is None:
raise OperationalError("Invalid use of consume") raise OperationalError("Invalid use of consume")
result, fields = CTaosInterface.consume(self._sub) result, fields = CTaosInterface.consume(self._sub)
buffer = [[] for i in range(len(fields))] buffer = [[] for i in range(len(fields))]
while True: while True:
block, num_of_fields = CTaosInterface.fetchBlock(result, fields) block, num_of_fields = CTaosInterface.fetchBlock(result, fields)
if num_of_fields == 0: break if num_of_fields == 0:
break
for i in range(len(fields)): for i in range(len(fields)):
buffer[i].extend(block[i]) buffer[i].extend(block[i])
self.fields = fields self.fields = fields
return list(map(tuple, zip(*buffer))) return list(map(tuple, zip(*buffer)))
def close(self, keepProgress=True):
def close(self, keepProgress = True):
"""Close the Subscription. """Close the Subscription.
""" """
if self._sub is None: if self._sub is None:
...@@ -38,15 +39,19 @@ class TDengineSubscription(object): ...@@ -38,15 +39,19 @@ class TDengineSubscription(object):
if __name__ == '__main__': if __name__ == '__main__':
from .connection import TDengineConnection from .connection import TDengineConnection
conn = TDengineConnection(host="127.0.0.1", user="root", password="taosdata", database="test") conn = TDengineConnection(
host="127.0.0.1",
user="root",
password="taosdata",
database="test")
# Generate a cursor object to run SQL commands # Generate a cursor object to run SQL commands
sub = conn.subscribe(True, "test", "select * from meters;", 1000) sub = conn.subscribe(True, "test", "select * from meters;", 1000)
for i in range(0,10): for i in range(0, 10):
data = sub.consume() data = sub.consume()
for d in data: for d in data:
print(d) print(d)
sub.close() sub.close()
conn.close() conn.close()
\ No newline at end of file
...@@ -25,7 +25,8 @@ class TDTestCase: ...@@ -25,7 +25,8 @@ class TDTestCase:
def run(self): def run(self):
tdSql.prepare() tdSql.prepare()
ret = tdSql.execute('create table tb (ts timestamp, speed int unsigned)') ret = tdSql.execute(
'create table tb (ts timestamp, speed int unsigned)')
insertRows = 10 insertRows = 10
tdLog.info("insert %d rows" % (insertRows)) tdLog.info("insert %d rows" % (insertRows))
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册