提交 f12c270e 编写于 作者: weixin_48148422's avatar weixin_48148422

tbase-1470: python2, linux

上级 781c0ff6
......@@ -13,14 +13,14 @@ def _convert_microsecond_to_datetime(micro):
def _crow_timestamp_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C bool row to python row
"""
_timstamp_converter = _convert_millisecond_to_datetime
_timestamp_converter = _convert_millisecond_to_datetime
if micro:
_timstamp_converter = _convert_microsecond_to_datetime
_timestamp_converter = _convert_microsecond_to_datetime
if num_of_rows > 0:
return list(map(_timstamp_converter, ctypes.cast(data, ctypes.POINTER(ctypes.c_long))[:abs(num_of_rows)][::-1]))
return list(map(_timestamp_converter, ctypes.cast(data, ctypes.POINTER(ctypes.c_long))[:abs(num_of_rows)][::-1]))
else:
return list(map(_timstamp_converter, ctypes.cast(data, ctypes.POINTER(ctypes.c_long))[:abs(num_of_rows)]))
return list(map(_timestamp_converter, ctypes.cast(data, ctypes.POINTER(ctypes.c_long))[:abs(num_of_rows)]))
def _crow_bool_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C bool row to python row
......@@ -144,6 +144,8 @@ class CTaosInterface(object):
libtaos.taos_use_result.restype = ctypes.c_void_p
libtaos.taos_fetch_row.restype = ctypes.POINTER(ctypes.c_void_p)
libtaos.taos_errstr.restype = ctypes.c_char_p
libtaos.taos_subscribe.restype = ctypes.c_void_p
libtaos.taos_consume.restype = ctypes.c_void_p
def __init__(self, config=None):
'''
......@@ -252,6 +254,41 @@ class CTaosInterface(object):
"""
return CTaosInterface.libtaos.taos_affected_rows(connection)
@staticmethod
def subscribe(connection, restart, topic, sql, interval):
"""Create a subscription
@restart boolean,
@sql string, sql statement for data query, must be a 'select' statement.
@topic string, name of this subscription
"""
return ctypes.c_void_p(CTaosInterface.libtaos.taos_subscribe(
connection,
1 if restart else 0,
ctypes.c_char_p(topic.encode('utf-8')),
ctypes.c_char_p(sql.encode('utf-8')),
None,
None,
interval))
@staticmethod
def consume(sub):
"""Consume data of a subscription
"""
result = ctypes.c_void_p(CTaosInterface.libtaos.taos_consume(sub))
fields = []
pfields = CTaosInterface.fetchFields(result)
for i in range(CTaosInterface.libtaos.taos_num_fields(result)):
fields.append({'name': pfields[i].name.decode('utf-8'),
'bytes': pfields[i].bytes,
'type': ord(pfields[i].type)})
return result, fields
@staticmethod
def unsubscribe(sub, keepProgress):
"""Cancel a subscription
"""
CTaosInterface.libtaos.taos_unsubscribe(sub, 1 if keepProgress else 0)
@staticmethod
def useResult(connection):
'''Use result after calling self.query
......@@ -275,8 +312,8 @@ class CTaosInterface(object):
if num_of_rows == 0:
return None, 0
blocks = [None] * len(fields)
isMicro = (CTaosInterface.libtaos.taos_result_precision(result) == FieldType.C_TIMESTAMP_MICRO)
blocks = [None] * len(fields)
for i in range(len(fields)):
data = ctypes.cast(pblock, ctypes.POINTER(ctypes.c_void_p))[i]
......@@ -352,3 +389,19 @@ class CTaosInterface(object):
"""Return the error styring
"""
return CTaosInterface.libtaos.taos_errstr(connection)
if __name__ == '__main__':
cinter = CTaosInterface()
conn = cinter.connect()
print('Query return value: {}'.format(cinter.query(conn, 'show databases')))
print('Affected rows: {}'.format(cinter.affectedRows(conn)))
result, des = CTaosInterface.useResult(conn)
data, num_of_rows = CTaosInterface.fetchBlock(result, des)
print(data)
cinter.close(conn)
\ No newline at end of file
# from .cursor import TDengineCursor
from .cursor import TDengineCursor
from .subscription import TDengineSubscription
from .cinterface import CTaosInterface
class TDengineConnection(object):
......@@ -50,6 +50,14 @@ class TDengineConnection(object):
"""
return CTaosInterface.close(self._conn)
def subscribe(self, restart, topic, sql, interval):
"""Create a subscription.
"""
if self._conn is None:
return None
sub = CTaosInterface.subscribe(self._conn, restart, topic, sql, interval)
return TDengineSubscription(sub)
def cursor(self):
"""Return a new Cursor object using the connection.
"""
......
from .cinterface import CTaosInterface
from .error import *
class TDengineSubscription(object):
"""TDengine subscription object
"""
def __init__(self, sub):
self._sub = sub
def consume(self):
"""Consume rows of a subscription
"""
if self._sub is None:
raise OperationalError("Invalid use of consume")
result, fields = CTaosInterface.consume(self._sub)
buffer = [[] for i in range(len(fields))]
print(buffer)
while True:
block, num_of_fields = CTaosInterface.fetchBlock(result, fields)
if num_of_fields == 0: break
for i in range(len(fields)):
buffer[i].extend(block[i])
return list(map(tuple, zip(*buffer)))
def close(self, keepProgress = True):
"""Close the Subscription.
"""
if self._sub is None:
return False
CTaosInterface.unsubscribe(self._sub, keepProgress)
return True
if __name__ == '__main__':
from .connection import TDengineConnection
conn = TDengineConnection(host="127.0.0.1", user="root", password="taosdata", database="test")
# Generate a cursor object to run SQL commands
sub = conn.subscribe(True, "test", "select * from meters;", 1000)
for i in range(0,10):
data = sub.consume()
for d in data:
print(d)
sub.close()
conn.close()
\ No newline at end of file
......@@ -34,3 +34,19 @@ class TDengineSubscription(object):
CTaosInterface.unsubscribe(self._sub, keepProgress)
return True
if __name__ == '__main__':
from .connection import TDengineConnection
conn = TDengineConnection(host="127.0.0.1", user="root", password="taosdata", database="test")
# Generate a cursor object to run SQL commands
sub = conn.subscribe(True, "test", "select * from meters;", 1000)
for i in range(0,10):
data = sub.consume()
for d in data:
print(d)
sub.close()
conn.close()
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册