未验证 提交 39f82130 编写于 作者: H Huo Linhe 提交者: GitHub

[TD-182]<enhance>: use single repo for python connector (#6036)

* [TD-182]<enhance>: use single repo for python connector

Remove code for each platform and build up one single python code base
for windows/osx/linux platforms and python2/python3 runtime.

* [TD-182]<enhance>: remove redundant code in python connector

* [TD-4149] <fix>: fix python connection config error
上级 e8190f17
# Created by https://www.toptal.com/developers/gitignore/api/python
# Edit at https://www.toptal.com/developers/gitignore?templates=python
### Python ###
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
pytestdebug.log
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
doc/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
#poetry.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
# .env
.env/
.venv/
env/
venv/
ENV/
env.bak/
venv.bak/
pythonenv*
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# operating system-related files
# file properties cache/storage on macOS
*.DS_Store
# thumbnail cache on Windows
Thumbs.db
# profiling data
.prof
# End of https://www.toptal.com/developers/gitignore/api/python
# TDengine Connector for Python
[TDengine] connector for Python enables python programs to access TDengine, using an API which is compliant with the Python DB API 2.0 (PEP-249). It uses TDengine C client library for client server communications.
## Install
```sh
pip install git+https://github.com/taosdata/TDengine-connector-python
```
## Source Code
[TDengine] connector for Python source code is hosted on [GitHub](https://github.com/taosdata/TDengine-connector-python).
## License - AGPL
Keep same with [TDengine](https://github.com/taosdata/TDengine).
import taos
conn = taos.connect(host='127.0.0.1',
user='root',
passworkd='taodata',
database='log')
cursor = conn.cursor()
sql = "select * from log.log limit 10"
cursor.execute(sql)
for row in cursor:
print(row)
../
\ No newline at end of file
# TDengine python client interface
\ No newline at end of file
import setuptools
with open("README.md", "r") as fh:
long_description = fh.read()
setuptools.setup(
name="taos",
version="2.0.9",
author="Taosdata Inc.",
author_email="support@taosdata.com",
description="TDengine python client package",
long_description=long_description,
long_description_content_type="text/markdown",
url="https://github.com/pypa/sampleproject",
packages=setuptools.find_packages(),
classifiers=[
"Programming Language :: Python :: 2",
"Operating System :: Linux",
],
)
from .cinterface import CTaosInterface
from .error import *
from .constants import FieldType
class TDengineCursor(object):
"""Database cursor which is used to manage the context of a fetch operation.
Attributes:
.description: Read-only attribute consists of 7-item sequences:
> name (mondatory)
> type_code (mondatory)
> display_size
> internal_size
> precision
> scale
> null_ok
This attribute will be None for operations that do not return rows or
if the cursor has not had an operation invoked via the .execute*() method yet.
.rowcount:This read-only attribute specifies the number of rows that the last
.execute*() produced (for DQL statements like SELECT) or affected
"""
def __init__(self, connection=None):
self._description = []
self._rowcount = -1
self._connection = None
self._result = None
self._fields = None
self._block = None
self._block_rows = -1
self._block_iter = 0
self._affected_rows = 0
self._logfile = ""
if connection is not None:
self._connection = connection
def __iter__(self):
return self
def next(self):
if self._result is None or self._fields is None:
raise OperationalError("Invalid use of fetch iterator")
if self._block_rows <= self._block_iter:
block, self._block_rows = CTaosInterface.fetchRow(
self._result, self._fields)
if self._block_rows == 0:
raise StopIteration
self._block = list(map(tuple, zip(*block)))
self._block_iter = 0
data = self._block[self._block_iter]
self._block_iter += 1
return data
@property
def description(self):
"""Return the description of the object.
"""
return self._description
@property
def rowcount(self):
"""Return the rowcount of the object
"""
return self._rowcount
@property
def affected_rows(self):
"""Return the affected_rows of the object
"""
return self._affected_rows
def callproc(self, procname, *args):
"""Call a stored database procedure with the given name.
Void functionality since no stored procedures.
"""
pass
def log(self, logfile):
self._logfile = logfile
def close(self):
"""Close the cursor.
"""
if self._connection is None:
return False
self._reset_result()
self._connection = None
return True
def execute(self, operation, params=None):
"""Prepare and execute a database operation (query or command).
"""
if not operation:
return None
if not self._connection:
# TODO : change the exception raised here
raise ProgrammingError("Cursor is not connected")
self._reset_result()
stmt = operation
if params is not None:
pass
# global querySeqNum
# querySeqNum += 1
# localSeqNum = querySeqNum # avoid raice condition
# print(" >> Exec Query ({}): {}".format(localSeqNum, str(stmt)))
self._result = CTaosInterface.query(self._connection._conn, stmt)
# print(" << Query ({}) Exec Done".format(localSeqNum))
if (self._logfile):
with open(self._logfile, "a") as logfile:
logfile.write("%s;\n" % operation)
errno = CTaosInterface.libtaos.taos_errno(self._result)
if errno == 0:
if CTaosInterface.fieldsCount(self._result) == 0:
self._affected_rows += CTaosInterface.affectedRows(
self._result)
return CTaosInterface.affectedRows(self._result)
else:
self._fields = CTaosInterface.useResult(
self._result)
return self._handle_result()
else:
raise ProgrammingError(
CTaosInterface.errStr(
self._result), errno)
def executemany(self, operation, seq_of_parameters):
"""Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters.
"""
pass
def fetchone(self):
"""Fetch the next row of a query result set, returning a single sequence, or None when no more data is available.
"""
pass
def fetchmany(self):
pass
def istype(self, col, dataType):
if (dataType.upper() == "BOOL"):
if (self._description[col][1] == FieldType.C_BOOL):
return True
if (dataType.upper() == "TINYINT"):
if (self._description[col][1] == FieldType.C_TINYINT):
return True
if (dataType.upper() == "TINYINT UNSIGNED"):
if (self._description[col][1] == FieldType.C_TINYINT_UNSIGNED):
return True
if (dataType.upper() == "SMALLINT"):
if (self._description[col][1] == FieldType.C_SMALLINT):
return True
if (dataType.upper() == "SMALLINT UNSIGNED"):
if (self._description[col][1] == FieldType.C_SMALLINT_UNSIGNED):
return True
if (dataType.upper() == "INT"):
if (self._description[col][1] == FieldType.C_INT):
return True
if (dataType.upper() == "INT UNSIGNED"):
if (self._description[col][1] == FieldType.C_INT_UNSIGNED):
return True
if (dataType.upper() == "BIGINT"):
if (self._description[col][1] == FieldType.C_BIGINT):
return True
if (dataType.upper() == "BIGINT UNSIGNED"):
if (self._description[col][1] == FieldType.C_BIGINT_UNSIGNED):
return True
if (dataType.upper() == "FLOAT"):
if (self._description[col][1] == FieldType.C_FLOAT):
return True
if (dataType.upper() == "DOUBLE"):
if (self._description[col][1] == FieldType.C_DOUBLE):
return True
if (dataType.upper() == "BINARY"):
if (self._description[col][1] == FieldType.C_BINARY):
return True
if (dataType.upper() == "TIMESTAMP"):
if (self._description[col][1] == FieldType.C_TIMESTAMP):
return True
if (dataType.upper() == "NCHAR"):
if (self._description[col][1] == FieldType.C_NCHAR):
return True
return False
def fetchall_row(self):
"""Fetch all (remaining) rows of a query result, returning them as a sequence of sequences (e.g. a list of tuples). Note that the cursor's arraysize attribute can affect the performance of this operation.
"""
if self._result is None or self._fields is None:
raise OperationalError("Invalid use of fetchall")
buffer = [[] for i in range(len(self._fields))]
self._rowcount = 0
while True:
block, num_of_fields = CTaosInterface.fetchRow(
self._result, self._fields)
errno = CTaosInterface.libtaos.taos_errno(self._result)
if errno != 0:
raise ProgrammingError(
CTaosInterface.errStr(
self._result), errno)
if num_of_fields == 0:
break
self._rowcount += num_of_fields
for i in range(len(self._fields)):
buffer[i].extend(block[i])
return list(map(tuple, zip(*buffer)))
def fetchall(self):
if self._result is None or self._fields is None:
raise OperationalError("Invalid use of fetchall")
buffer = [[] for i in range(len(self._fields))]
self._rowcount = 0
while True:
block, num_of_fields = CTaosInterface.fetchBlock(
self._result, self._fields)
errno = CTaosInterface.libtaos.taos_errno(self._result)
if errno != 0:
raise ProgrammingError(
CTaosInterface.errStr(
self._result), errno)
if num_of_fields == 0:
break
self._rowcount += num_of_fields
for i in range(len(self._fields)):
buffer[i].extend(block[i])
return list(map(tuple, zip(*buffer)))
def nextset(self):
"""
"""
pass
def setinputsize(self, sizes):
pass
def setutputsize(self, size, column=None):
pass
def _reset_result(self):
"""Reset the result to unused version.
"""
self._description = []
self._rowcount = -1
if self._result is not None:
CTaosInterface.freeResult(self._result)
self._result = None
self._fields = None
self._block = None
self._block_rows = -1
self._block_iter = 0
self._affected_rows = 0
def _handle_result(self):
"""Handle the return result from query.
"""
self._description = []
for ele in self._fields:
self._description.append(
(ele['name'], ele['type'], None, None, None, None, False))
return self._result
../
\ No newline at end of file
Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
This program is free software: you can use, redistribute, and/or modify
it under the terms of the GNU Affero General Public License, version 3
or later ("AGPL"), as published by the Free Software Foundation.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
# TDengine python client interface
\ No newline at end of file
from .connection import TDengineConnection
from .cursor import TDengineCursor
# Globals
threadsafety = 0
paramstyle = 'pyformat'
__all__ = ['connection', 'cursor']
def connect(*args, **kwargs):
""" Function to return a TDengine connector object
Current supporting keyword parameters:
@dsn: Data source name as string
@user: Username as string(optional)
@password: Password as string(optional)
@host: Hostname(optional)
@database: Database name(optional)
@rtype: TDengineConnector
"""
return TDengineConnection(*args, **kwargs)
from .cursor import TDengineCursor
from .subscription import TDengineSubscription
from .cinterface import CTaosInterface
class TDengineConnection(object):
""" TDengine connection object
"""
def __init__(self, *args, **kwargs):
self._conn = None
self._host = None
self._user = "root"
self._password = "taosdata"
self._database = None
self._port = 0
self._config = None
self._chandle = None
self.config(**kwargs)
def config(self, **kwargs):
# host
if 'host' in kwargs:
self._host = kwargs['host']
# user
if 'user' in kwargs:
self._user = kwargs['user']
# password
if 'password' in kwargs:
self._password = kwargs['password']
# database
if 'database' in kwargs:
self._database = kwargs['database']
# port
if 'port' in kwargs:
self._port = kwargs['port']
# config
if 'config' in kwargs:
self._config = kwargs['config']
self._chandle = CTaosInterface(self._config)
self._conn = self._chandle.connect(
self._host,
self._user,
self._password,
self._database,
self._port)
def close(self):
"""Close current connection.
"""
return CTaosInterface.close(self._conn)
def subscribe(self, restart, topic, sql, interval):
"""Create a subscription.
"""
if self._conn is None:
return None
sub = CTaosInterface.subscribe(
self._conn, restart, topic, sql, interval)
return TDengineSubscription(sub)
def cursor(self):
"""Return a new Cursor object using the connection.
"""
return TDengineCursor(self)
def commit(self):
"""Commit any pending transaction to the database.
Since TDengine do not support transactions, the implement is void functionality.
"""
pass
def rollback(self):
"""Void functionality
"""
pass
def clear_result_set(self):
"""Clear unused result set on this connection.
"""
pass
if __name__ == "__main__":
conn = TDengineConnection(host='192.168.1.107')
conn.close()
print("Hello world")
"""Constants in TDengine python
"""
from .dbapi import *
class FieldType(object):
"""TDengine Field Types
"""
# type_code
C_NULL = 0
C_BOOL = 1
C_TINYINT = 2
C_SMALLINT = 3
C_INT = 4
C_BIGINT = 5
C_FLOAT = 6
C_DOUBLE = 7
C_BINARY = 8
C_TIMESTAMP = 9
C_NCHAR = 10
C_TINYINT_UNSIGNED = 11
C_SMALLINT_UNSIGNED = 12
C_INT_UNSIGNED = 13
C_BIGINT_UNSIGNED = 14
# NULL value definition
# NOTE: These values should change according to C definition in tsdb.h
C_BOOL_NULL = 0x02
C_TINYINT_NULL = -128
C_TINYINT_UNSIGNED_NULL = 255
C_SMALLINT_NULL = -32768
C_SMALLINT_UNSIGNED_NULL = 65535
C_INT_NULL = -2147483648
C_INT_UNSIGNED_NULL = 4294967295
C_BIGINT_NULL = -9223372036854775808
C_BIGINT_UNSIGNED_NULL = 18446744073709551615
C_FLOAT_NULL = float('nan')
C_DOUBLE_NULL = float('nan')
C_BINARY_NULL = bytearray([int('0xff', 16)])
# Timestamp precision definition
C_TIMESTAMP_MILLI = 0
C_TIMESTAMP_MICRO = 1
from .cinterface import CTaosInterface
from .error import *
from .constants import FieldType
# querySeqNum = 0
class TDengineCursor(object):
"""Database cursor which is used to manage the context of a fetch operation.
Attributes:
.description: Read-only attribute consists of 7-item sequences:
> name (mondatory)
> type_code (mondatory)
> display_size
> internal_size
> precision
> scale
> null_ok
This attribute will be None for operations that do not return rows or
if the cursor has not had an operation invoked via the .execute*() method yet.
.rowcount:This read-only attribute specifies the number of rows that the last
.execute*() produced (for DQL statements like SELECT) or affected
"""
def __init__(self, connection=None):
self._description = []
self._rowcount = -1
self._connection = None
self._result = None
self._fields = None
self._block = None
self._block_rows = -1
self._block_iter = 0
self._affected_rows = 0
self._logfile = ""
if connection is not None:
self._connection = connection
def __iter__(self):
return self
def __next__(self):
if self._result is None or self._fields is None:
raise OperationalError("Invalid use of fetch iterator")
if self._block_rows <= self._block_iter:
block, self._block_rows = CTaosInterface.fetchRow(
self._result, self._fields)
if self._block_rows == 0:
raise StopIteration
self._block = list(map(tuple, zip(*block)))
self._block_iter = 0
data = self._block[self._block_iter]
self._block_iter += 1
return data
@property
def description(self):
"""Return the description of the object.
"""
return self._description
@property
def rowcount(self):
"""Return the rowcount of the object
"""
return self._rowcount
@property
def affected_rows(self):
"""Return the rowcount of insertion
"""
return self._affected_rows
def callproc(self, procname, *args):
"""Call a stored database procedure with the given name.
Void functionality since no stored procedures.
"""
pass
def log(self, logfile):
self._logfile = logfile
def close(self):
"""Close the cursor.
"""
if self._connection is None:
return False
self._reset_result()
self._connection = None
return True
def execute(self, operation, params=None):
"""Prepare and execute a database operation (query or command).
"""
if not operation:
return None
if not self._connection:
# TODO : change the exception raised here
raise ProgrammingError("Cursor is not connected")
self._reset_result()
stmt = operation
if params is not None:
pass
# global querySeqNum
# querySeqNum += 1
# localSeqNum = querySeqNum # avoid raice condition
# print(" >> Exec Query ({}): {}".format(localSeqNum, str(stmt)))
self._result = CTaosInterface.query(self._connection._conn, stmt)
# print(" << Query ({}) Exec Done".format(localSeqNum))
if (self._logfile):
with open(self._logfile, "a") as logfile:
logfile.write("%s;\n" % operation)
errno = CTaosInterface.libtaos.taos_errno(self._result)
if errno == 0:
if CTaosInterface.fieldsCount(self._result) == 0:
self._affected_rows += CTaosInterface.affectedRows(
self._result)
return CTaosInterface.affectedRows(self._result)
else:
self._fields = CTaosInterface.useResult(
self._result)
return self._handle_result()
else:
raise ProgrammingError(
CTaosInterface.errStr(
self._result), errno)
def executemany(self, operation, seq_of_parameters):
"""Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters.
"""
pass
def fetchone(self):
"""Fetch the next row of a query result set, returning a single sequence, or None when no more data is available.
"""
pass
def fetchmany(self):
pass
def istype(self, col, dataType):
if (dataType.upper() == "BOOL"):
if (self._description[col][1] == FieldType.C_BOOL):
return True
if (dataType.upper() == "TINYINT"):
if (self._description[col][1] == FieldType.C_TINYINT):
return True
if (dataType.upper() == "TINYINT UNSIGNED"):
if (self._description[col][1] == FieldType.C_TINYINT_UNSIGNED):
return True
if (dataType.upper() == "SMALLINT"):
if (self._description[col][1] == FieldType.C_SMALLINT):
return True
if (dataType.upper() == "SMALLINT UNSIGNED"):
if (self._description[col][1] == FieldType.C_SMALLINT_UNSIGNED):
return True
if (dataType.upper() == "INT"):
if (self._description[col][1] == FieldType.C_INT):
return True
if (dataType.upper() == "INT UNSIGNED"):
if (self._description[col][1] == FieldType.C_INT_UNSIGNED):
return True
if (dataType.upper() == "BIGINT"):
if (self._description[col][1] == FieldType.C_BIGINT):
return True
if (dataType.upper() == "BIGINT UNSIGNED"):
if (self._description[col][1] == FieldType.C_BIGINT_UNSIGNED):
return True
if (dataType.upper() == "FLOAT"):
if (self._description[col][1] == FieldType.C_FLOAT):
return True
if (dataType.upper() == "DOUBLE"):
if (self._description[col][1] == FieldType.C_DOUBLE):
return True
if (dataType.upper() == "BINARY"):
if (self._description[col][1] == FieldType.C_BINARY):
return True
if (dataType.upper() == "TIMESTAMP"):
if (self._description[col][1] == FieldType.C_TIMESTAMP):
return True
if (dataType.upper() == "NCHAR"):
if (self._description[col][1] == FieldType.C_NCHAR):
return True
return False
def fetchall_row(self):
"""Fetch all (remaining) rows of a query result, returning them as a sequence of sequences (e.g. a list of tuples). Note that the cursor's arraysize attribute can affect the performance of this operation.
"""
if self._result is None or self._fields is None:
raise OperationalError("Invalid use of fetchall")
buffer = [[] for i in range(len(self._fields))]
self._rowcount = 0
while True:
block, num_of_fields = CTaosInterface.fetchRow(
self._result, self._fields)
errno = CTaosInterface.libtaos.taos_errno(self._result)
if errno != 0:
raise ProgrammingError(
CTaosInterface.errStr(
self._result), errno)
if num_of_fields == 0:
break
self._rowcount += num_of_fields
for i in range(len(self._fields)):
buffer[i].extend(block[i])
return list(map(tuple, zip(*buffer)))
def fetchall(self):
if self._result is None or self._fields is None:
raise OperationalError("Invalid use of fetchall")
buffer = [[] for i in range(len(self._fields))]
self._rowcount = 0
while True:
block, num_of_fields = CTaosInterface.fetchBlock(
self._result, self._fields)
errno = CTaosInterface.libtaos.taos_errno(self._result)
if errno != 0:
raise ProgrammingError(
CTaosInterface.errStr(
self._result), errno)
if num_of_fields == 0:
break
self._rowcount += num_of_fields
for i in range(len(self._fields)):
buffer[i].extend(block[i])
return list(map(tuple, zip(*buffer)))
def nextset(self):
"""
"""
pass
def setinputsize(self, sizes):
pass
def setutputsize(self, size, column=None):
pass
def _reset_result(self):
"""Reset the result to unused version.
"""
self._description = []
self._rowcount = -1
if self._result is not None:
CTaosInterface.freeResult(self._result)
self._result = None
self._fields = None
self._block = None
self._block_rows = -1
self._block_iter = 0
self._affected_rows = 0
def _handle_result(self):
"""Handle the return result from query.
"""
self._description = []
for ele in self._fields:
self._description.append(
(ele['name'], ele['type'], None, None, None, None, False))
return self._result
"""Type Objects and Constructors.
"""
import time
import datetime
class DBAPITypeObject(object):
def __init__(self, *values):
self.values = values
def __com__(self, other):
if other in self.values:
return 0
if other < self.values:
return 1
else:
return -1
Date = datetime.date
Time = datetime.time
Timestamp = datetime.datetime
def DataFromTicks(ticks):
return Date(*time.localtime(ticks)[:3])
def TimeFromTicks(ticks):
return Time(*time.localtime(ticks)[3:6])
def TimestampFromTicks(ticks):
return Timestamp(*time.localtime(ticks)[:6])
Binary = bytes
# STRING = DBAPITypeObject(*constants.FieldType.get_string_types())
# BINARY = DBAPITypeObject(*constants.FieldType.get_binary_types())
# NUMBER = BAPITypeObject(*constants.FieldType.get_number_types())
# DATETIME = DBAPITypeObject(*constants.FieldType.get_timestamp_types())
# ROWID = DBAPITypeObject()
"""Python exceptions
"""
class Error(Exception):
def __init__(self, msg=None, errno=None):
self.msg = msg
self._full_msg = self.msg
self.errno = errno
def __str__(self):
return self._full_msg
class Warning(Exception):
"""Exception raised for important warnings like data truncations while inserting.
"""
pass
class InterfaceError(Error):
"""Exception raised for errors that are related to the database interface rather than the database itself.
"""
pass
class DatabaseError(Error):
"""Exception raised for errors that are related to the database.
"""
pass
class DataError(DatabaseError):
"""Exception raised for errors that are due to problems with the processed data like division by zero, numeric value out of range.
"""
pass
class OperationalError(DatabaseError):
"""Exception raised for errors that are related to the database's operation and not necessarily under the control of the programmer
"""
pass
class IntegrityError(DatabaseError):
"""Exception raised when the relational integrity of the database is affected.
"""
pass
class InternalError(DatabaseError):
"""Exception raised when the database encounters an internal error.
"""
pass
class ProgrammingError(DatabaseError):
"""Exception raised for programming errors.
"""
pass
class NotSupportedError(DatabaseError):
"""Exception raised in case a method or database API was used which is not supported by the database,.
"""
pass
from .cinterface import CTaosInterface
from .error import *
class TDengineSubscription(object):
"""TDengine subscription object
"""
def __init__(self, sub):
self._sub = sub
def consume(self):
"""Consume rows of a subscription
"""
if self._sub is None:
raise OperationalError("Invalid use of consume")
result, fields = CTaosInterface.consume(self._sub)
buffer = [[] for i in range(len(fields))]
while True:
block, num_of_fields = CTaosInterface.fetchBlock(result, fields)
if num_of_fields == 0:
break
for i in range(len(fields)):
buffer[i].extend(block[i])
self.fields = fields
return list(map(tuple, zip(*buffer)))
def close(self, keepProgress=True):
"""Close the Subscription.
"""
if self._sub is None:
return False
CTaosInterface.unsubscribe(self._sub, keepProgress)
return True
if __name__ == '__main__':
from .connection import TDengineConnection
conn = TDengineConnection(
host="127.0.0.1",
user="root",
password="taosdata",
database="test")
# Generate a cursor object to run SQL commands
sub = conn.subscribe(True, "test", "select * from meters;", 1000)
for i in range(0, 10):
data = sub.consume()
for d in data:
print(d)
sub.close()
conn.close()
../
\ No newline at end of file
Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
This program is free software: you can use, redistribute, and/or modify
it under the terms of the GNU Affero General Public License, version 3
or later ("AGPL"), as published by the Free Software Foundation.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
# TDengine python client interface
\ No newline at end of file
import setuptools
with open("README.md", "r") as fh:
long_description = fh.read()
setuptools.setup(
name="taos",
version="2.0.9",
author="Taosdata Inc.",
author_email="support@taosdata.com",
description="TDengine python client package",
long_description=long_description,
long_description_content_type="text/markdown",
url="https://github.com/pypa/sampleproject",
packages=setuptools.find_packages(),
classifiers=[
"Programming Language :: Python :: 3",
"Operating System :: MacOS X",
],
)
from .connection import TDengineConnection
from .cursor import TDengineCursor
# Globals
threadsafety = 0
paramstyle = 'pyformat'
__all__ = ['connection', 'cursor']
def connect(*args, **kwargs):
""" Function to return a TDengine connector object
Current supporting keyword parameters:
@dsn: Data source name as string
@user: Username as string(optional)
@password: Password as string(optional)
@host: Hostname(optional)
@database: Database name(optional)
@rtype: TDengineConnector
"""
return TDengineConnection(*args, **kwargs)
from .cursor import TDengineCursor
from .subscription import TDengineSubscription
from .cinterface import CTaosInterface
class TDengineConnection(object):
""" TDengine connection object
"""
def __init__(self, *args, **kwargs):
self._conn = None
self._host = None
self._user = "root"
self._password = "taosdata"
self._database = None
self._port = 0
self._config = None
self._chandle = None
self.config(**kwargs)
def config(self, **kwargs):
# host
if 'host' in kwargs:
self._host = kwargs['host']
# user
if 'user' in kwargs:
self._user = kwargs['user']
# password
if 'password' in kwargs:
self._password = kwargs['password']
# database
if 'database' in kwargs:
self._database = kwargs['database']
# port
if 'port' in kwargs:
self._port = kwargs['port']
# config
if 'config' in kwargs:
self._config = kwargs['config']
self._chandle = CTaosInterface(self._config)
self._conn = self._chandle.connect(
self._host,
self._user,
self._password,
self._database,
self._port)
def close(self):
"""Close current connection.
"""
return CTaosInterface.close(self._conn)
def subscribe(self, restart, topic, sql, interval):
"""Create a subscription.
"""
if self._conn is None:
return None
sub = CTaosInterface.subscribe(
self._conn, restart, topic, sql, interval)
return TDengineSubscription(sub)
def cursor(self):
"""Return a new Cursor object using the connection.
"""
return TDengineCursor(self)
def commit(self):
"""Commit any pending transaction to the database.
Since TDengine do not support transactions, the implement is void functionality.
"""
pass
def rollback(self):
"""Void functionality
"""
pass
def clear_result_set(self):
"""Clear unused result set on this connection.
"""
pass
if __name__ == "__main__":
conn = TDengineConnection(host='192.168.1.107')
conn.close()
print("Hello world")
"""Constants in TDengine python
"""
from .dbapi import *
class FieldType(object):
"""TDengine Field Types
"""
# type_code
C_NULL = 0
C_BOOL = 1
C_TINYINT = 2
C_SMALLINT = 3
C_INT = 4
C_BIGINT = 5
C_FLOAT = 6
C_DOUBLE = 7
C_BINARY = 8
C_TIMESTAMP = 9
C_NCHAR = 10
C_TINYINT_UNSIGNED = 11
C_SMALLINT_UNSIGNED = 12
C_INT_UNSIGNED = 13
C_BIGINT_UNSIGNED = 14
# NULL value definition
# NOTE: These values should change according to C definition in tsdb.h
C_BOOL_NULL = 0x02
C_TINYINT_NULL = -128
C_TINYINT_UNSIGNED_NULL = 255
C_SMALLINT_NULL = -32768
C_SMALLINT_UNSIGNED_NULL = 65535
C_INT_NULL = -2147483648
C_INT_UNSIGNED_NULL = 4294967295
C_BIGINT_NULL = -9223372036854775808
C_BIGINT_UNSIGNED_NULL = 18446744073709551615
C_FLOAT_NULL = float('nan')
C_DOUBLE_NULL = float('nan')
C_BINARY_NULL = bytearray([int('0xff', 16)])
# Timestamp precision definition
C_TIMESTAMP_MILLI = 0
C_TIMESTAMP_MICRO = 1
"""Type Objects and Constructors.
"""
import time
import datetime
class DBAPITypeObject(object):
def __init__(self, *values):
self.values = values
def __com__(self, other):
if other in self.values:
return 0
if other < self.values:
return 1
else:
return -1
Date = datetime.date
Time = datetime.time
Timestamp = datetime.datetime
def DataFromTicks(ticks):
return Date(*time.localtime(ticks)[:3])
def TimeFromTicks(ticks):
return Time(*time.localtime(ticks)[3:6])
def TimestampFromTicks(ticks):
return Timestamp(*time.localtime(ticks)[:6])
Binary = bytes
# STRING = DBAPITypeObject(*constants.FieldType.get_string_types())
# BINARY = DBAPITypeObject(*constants.FieldType.get_binary_types())
# NUMBER = BAPITypeObject(*constants.FieldType.get_number_types())
# DATETIME = DBAPITypeObject(*constants.FieldType.get_timestamp_types())
# ROWID = DBAPITypeObject()
"""Python exceptions
"""
class Error(Exception):
def __init__(self, msg=None, errno=None):
self.msg = msg
self._full_msg = self.msg
self.errno = errno
def __str__(self):
return self._full_msg
class Warning(Exception):
"""Exception raised for important warnings like data truncations while inserting.
"""
pass
class InterfaceError(Error):
"""Exception raised for errors that are related to the database interface rather than the database itself.
"""
pass
class DatabaseError(Error):
"""Exception raised for errors that are related to the database.
"""
pass
class DataError(DatabaseError):
"""Exception raised for errors that are due to problems with the processed data like division by zero, numeric value out of range.
"""
pass
class OperationalError(DatabaseError):
"""Exception raised for errors that are related to the database's operation and not necessarily under the control of the programmer
"""
pass
class IntegrityError(DatabaseError):
"""Exception raised when the relational integrity of the database is affected.
"""
pass
class InternalError(DatabaseError):
"""Exception raised when the database encounters an internal error.
"""
pass
class ProgrammingError(DatabaseError):
"""Exception raised for programming errors.
"""
pass
class NotSupportedError(DatabaseError):
"""Exception raised in case a method or database API was used which is not supported by the database,.
"""
pass
from .cinterface import CTaosInterface
from .error import *
class TDengineSubscription(object):
"""TDengine subscription object
"""
def __init__(self, sub):
self._sub = sub
def consume(self):
"""Consume rows of a subscription
"""
if self._sub is None:
raise OperationalError("Invalid use of consume")
result, fields = CTaosInterface.consume(self._sub)
buffer = [[] for i in range(len(fields))]
while True:
block, num_of_fields = CTaosInterface.fetchBlock(result, fields)
if num_of_fields == 0:
break
for i in range(len(fields)):
buffer[i].extend(block[i])
self.fields = fields
return list(map(tuple, zip(*buffer)))
def close(self, keepProgress=True):
"""Close the Subscription.
"""
if self._sub is None:
return False
CTaosInterface.unsubscribe(self._sub, keepProgress)
return True
if __name__ == '__main__':
from .connection import TDengineConnection
conn = TDengineConnection(
host="127.0.0.1",
user="root",
password="taosdata",
database="test")
# Generate a cursor object to run SQL commands
sub = conn.subscribe(True, "test", "select * from meters;", 1000)
for i in range(0, 10):
data = sub.consume()
for d in data:
print(d)
sub.close()
conn.close()
......@@ -14,7 +14,22 @@ setuptools.setup(
url="https://github.com/pypa/sampleproject",
packages=setuptools.find_packages(),
classifiers=[
"Environment :: Console",
"Environment :: MacOS X",
"Environment :: Win32 (MS Windows)",
"Programming Language :: Python",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)",
"Operating System :: MacOS",
"Programming Language :: Python :: 2.7",
"Operating System :: Linux",
"Operating System :: POSIX :: Linux",
"Operating System :: Microsoft :: Windows",
"Operating System :: Microsoft :: Windows :: Windows 10",
],
)
......@@ -3,6 +3,7 @@ from .constants import FieldType
from .error import *
import math
import datetime
import platform
def _convert_millisecond_to_datetime(milli):
......@@ -20,46 +21,28 @@ def _crow_timestamp_to_python(data, num_of_rows, nbytes=None, micro=False):
if micro:
_timestamp_converter = _convert_microsecond_to_datetime
if num_of_rows > 0:
return [
None if ele == FieldType.C_BIGINT_NULL else _timestamp_converter(ele) for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_int64))[
:abs(num_of_rows)]]
else:
return [
None if ele == FieldType.C_BIGINT_NULL else _timestamp_converter(ele) for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_int64))[
:abs(num_of_rows)]]
return [
None if ele == FieldType.C_BIGINT_NULL else _timestamp_converter(ele) for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_int64))[
:abs(num_of_rows)]]
def _crow_bool_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C bool row to python row
"""
if num_of_rows > 0:
return [
None if ele == FieldType.C_BOOL_NULL else bool(ele) for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_byte))[
:abs(num_of_rows)]]
else:
return [
None if ele == FieldType.C_BOOL_NULL else bool(ele) for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_bool))[
:abs(num_of_rows)]]
return [
None if ele == FieldType.C_BOOL_NULL else bool(ele) for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_byte))[
:abs(num_of_rows)]]
def _crow_tinyint_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C tinyint row to python row
"""
if num_of_rows > 0:
return [None if ele == FieldType.C_TINYINT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)]]
else:
return [None if ele == FieldType.C_TINYINT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)]]
return [None if ele == FieldType.C_TINYINT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)]]
def _crow_tinyint_unsigned_to_python(
......@@ -69,92 +52,56 @@ def _crow_tinyint_unsigned_to_python(
micro=False):
"""Function to convert C tinyint row to python row
"""
if num_of_rows > 0:
return [
None if ele == FieldType.C_TINYINT_UNSIGNED_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_ubyte))[
:abs(num_of_rows)]]
else:
return [
None if ele == FieldType.C_TINYINT_UNSIGNED_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_ubyte))[
:abs(num_of_rows)]]
return [
None if ele == FieldType.C_TINYINT_UNSIGNED_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_ubyte))[
:abs(num_of_rows)]]
def _crow_smallint_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C smallint row to python row
"""
if num_of_rows > 0:
return [
None if ele == FieldType.C_SMALLINT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_short))[
:abs(num_of_rows)]]
else:
return [
None if ele == FieldType.C_SMALLINT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_short))[
:abs(num_of_rows)]]
return [
None if ele == FieldType.C_SMALLINT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_short))[
:abs(num_of_rows)]]
def _crow_smallint_unsigned_to_python(
data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C smallint row to python row
"""
if num_of_rows > 0:
return [
None if ele == FieldType.C_SMALLINT_UNSIGNED_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_ushort))[
:abs(num_of_rows)]]
else:
return [
None if ele == FieldType.C_SMALLINT_UNSIGNED_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_ushort))[
:abs(num_of_rows)]]
return [
None if ele == FieldType.C_SMALLINT_UNSIGNED_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_ushort))[
:abs(num_of_rows)]]
def _crow_int_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C int row to python row
"""
if num_of_rows > 0:
return [None if ele == FieldType.C_INT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_int))[:abs(num_of_rows)]]
else:
return [None if ele == FieldType.C_INT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_int))[:abs(num_of_rows)]]
return [None if ele == FieldType.C_INT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_int))[:abs(num_of_rows)]]
def _crow_int_unsigned_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C int row to python row
"""
if num_of_rows > 0:
return [
None if ele == FieldType.C_INT_UNSIGNED_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_uint))[
:abs(num_of_rows)]]
else:
return [
None if ele == FieldType.C_INT_UNSIGNED_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_uint))[
:abs(num_of_rows)]]
return [
None if ele == FieldType.C_INT_UNSIGNED_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_uint))[
:abs(num_of_rows)]]
def _crow_bigint_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C bigint row to python row
"""
if num_of_rows > 0:
return [None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_int64))[:abs(num_of_rows)]]
else:
return [None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_int64))[:abs(num_of_rows)]]
return [None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_int64))[:abs(num_of_rows)]]
def _crow_bigint_unsigned_to_python(
......@@ -164,52 +111,33 @@ def _crow_bigint_unsigned_to_python(
micro=False):
"""Function to convert C bigint row to python row
"""
if num_of_rows > 0:
return [
None if ele == FieldType.C_BIGINT_UNSIGNED_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_uint64))[
:abs(num_of_rows)]]
else:
return [
None if ele == FieldType.C_BIGINT_UNSIGNED_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_uint64))[
:abs(num_of_rows)]]
return [
None if ele == FieldType.C_BIGINT_UNSIGNED_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_uint64))[
:abs(num_of_rows)]]
def _crow_float_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C float row to python row
"""
if num_of_rows > 0:
return [None if math.isnan(ele) else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_float))[:abs(num_of_rows)]]
else:
return [None if math.isnan(ele) else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_float))[:abs(num_of_rows)]]
return [None if math.isnan(ele) else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_float))[:abs(num_of_rows)]]
def _crow_double_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C double row to python row
"""
if num_of_rows > 0:
return [None if math.isnan(ele) else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_double))[:abs(num_of_rows)]]
else:
return [None if math.isnan(ele) else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_double))[:abs(num_of_rows)]]
return [None if math.isnan(ele) else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_double))[:abs(num_of_rows)]]
def _crow_binary_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C binary row to python row
"""
assert(nbytes is not None)
if num_of_rows > 0:
return [None if ele.value[0:1] == FieldType.C_BINARY_NULL else ele.value.decode(
'utf-8') for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_char * nbytes)))[:abs(num_of_rows)]]
else:
return [None if ele.value[0:1] == FieldType.C_BINARY_NULL else ele.value.decode(
'utf-8') for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_char * nbytes)))[:abs(num_of_rows)]]
return [None if ele.value[0:1] == FieldType.C_BINARY_NULL else ele.value.decode(
'utf-8') for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_char * nbytes)))[:abs(num_of_rows)]]
def _crow_nchar_to_python(data, num_of_rows, nbytes=None, micro=False):
......@@ -236,30 +164,17 @@ def _crow_binary_to_python_block(data, num_of_rows, nbytes=None, micro=False):
"""
assert(nbytes is not None)
res = []
if num_of_rows > 0:
for i in range(abs(num_of_rows)):
try:
rbyte = ctypes.cast(
data + nbytes * i,
ctypes.POINTER(
ctypes.c_short))[
:1].pop()
tmpstr = ctypes.c_char_p(data + nbytes * i + 2)
res.append(tmpstr.value.decode()[0:rbyte])
except ValueError:
res.append(None)
else:
for i in range(abs(num_of_rows)):
try:
rbyte = ctypes.cast(
data + nbytes * i,
ctypes.POINTER(
ctypes.c_short))[
:1].pop()
tmpstr = ctypes.c_char_p(data + nbytes * i + 2)
res.append(tmpstr.value.decode()[0:rbyte])
except ValueError:
res.append(None)
for i in range(abs(num_of_rows)):
try:
rbyte = ctypes.cast(
data + nbytes * i,
ctypes.POINTER(
ctypes.c_short))[
:1].pop()
tmpstr = ctypes.c_char_p(data + nbytes * i + 2)
res.append(tmpstr.value.decode()[0:rbyte])
except ValueError:
res.append(None)
return res
......@@ -268,20 +183,12 @@ def _crow_nchar_to_python_block(data, num_of_rows, nbytes=None, micro=False):
"""
assert(nbytes is not None)
res = []
if num_of_rows >= 0:
for i in range(abs(num_of_rows)):
try:
tmpstr = ctypes.c_char_p(data + nbytes * i + 2)
res.append(tmpstr.value.decode())
except ValueError:
res.append(None)
else:
for i in range(abs(num_of_rows)):
try:
res.append((ctypes.cast(data + nbytes * i + 2,
ctypes.POINTER(ctypes.c_wchar * (nbytes // 4))))[0].value)
except ValueError:
res.append(None)
for i in range(abs(num_of_rows)):
try:
tmpstr = ctypes.c_char_p(data + nbytes * i + 2)
res.append(tmpstr.value.decode())
except ValueError:
res.append(None)
return res
......@@ -330,14 +237,38 @@ class TaosField(ctypes.Structure):
# C interface class
def _load_taos_linux():
return ctypes.CDLL('libtaos.so')
def _load_taos_darwin():
return ctypes.cDLL('libtaos.dylib')
def _load_taos_windows():
return ctypes.windll.LoadLibrary('taos')
def _load_taos():
load_func = {
'Linux': _load_taos_linux,
'Darwin': _load_taos_darwin,
'Windows': _load_taos_windows,
}
try:
return load_func[platform.system()]()
except:
sys.exit('unsupported platform to TDengine connector')
class CTaosInterface(object):
libtaos = ctypes.CDLL('libtaos.so')
libtaos = _load_taos()
libtaos.taos_fetch_fields.restype = ctypes.POINTER(TaosField)
libtaos.taos_init.restype = None
libtaos.taos_connect.restype = ctypes.c_void_p
#libtaos.taos_use_result.restype = ctypes.c_void_p
# libtaos.taos_use_result.restype = ctypes.c_void_p
libtaos.taos_fetch_row.restype = ctypes.POINTER(ctypes.c_void_p)
libtaos.taos_errstr.restype = ctypes.c_char_p
libtaos.taos_subscribe.restype = ctypes.c_void_p
......@@ -438,7 +369,7 @@ class CTaosInterface(object):
'''Close the TDengine handle
'''
CTaosInterface.libtaos.taos_close(connection)
#print('connection is closed')
# print('connection is closed')
@staticmethod
def query(connection, sql):
......
......@@ -45,6 +45,12 @@ class TDengineCursor(object):
return self
def __next__(self):
return self._taos_next()
def next(self):
return self._taos_next()
def _taos_next(self):
if self._result is None or self._fields is None:
raise OperationalError("Invalid use of fetch iterator")
......
../
\ No newline at end of file
Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
This program is free software: you can use, redistribute, and/or modify
it under the terms of the GNU Affero General Public License, version 3
or later ("AGPL"), as published by the Free Software Foundation.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
# TDengine python client interface
\ No newline at end of file
import setuptools
with open("README.md", "r") as fh:
long_description = fh.read()
setuptools.setup(
name="taos",
version="2.0.9",
author="Taosdata Inc.",
author_email="support@taosdata.com",
description="TDengine python client package",
long_description=long_description,
long_description_content_type="text/markdown",
url="https://github.com/pypa/sampleproject",
packages=setuptools.find_packages(),
classifiers=[
"Programming Language :: Python :: 2",
"Operating System :: Windows",
],
)
from .connection import TDengineConnection
from .cursor import TDengineCursor
# Globals
threadsafety = 0
paramstyle = 'pyformat'
__all__ = ['connection', 'cursor']
def connect(*args, **kwargs):
""" Function to return a TDengine connector object
Current supporting keyword parameters:
@dsn: Data source name as string
@user: Username as string(optional)
@password: Password as string(optional)
@host: Hostname(optional)
@database: Database name(optional)
@rtype: TDengineConnector
"""
return TDengineConnection(*args, **kwargs)
from .cursor import TDengineCursor
from .subscription import TDengineSubscription
from .cinterface import CTaosInterface
class TDengineConnection(object):
""" TDengine connection object
"""
def __init__(self, *args, **kwargs):
self._conn = None
self._host = None
self._user = "root"
self._password = "taosdata"
self._database = None
self._port = 0
self._config = None
self._chandle = None
if len(kwargs) > 0:
self.config(**kwargs)
def config(self, **kwargs):
# host
if 'host' in kwargs:
self._host = kwargs['host']
# user
if 'user' in kwargs:
self._user = kwargs['user']
# password
if 'password' in kwargs:
self._password = kwargs['password']
# database
if 'database' in kwargs:
self._database = kwargs['database']
# port
if 'port' in kwargs:
self._port = kwargs['port']
# config
if 'config' in kwargs:
self._config = kwargs['config']
self._chandle = CTaosInterface(self._config)
self._conn = self._chandle.connect(
self._host,
self._user,
self._password,
self._database,
self._port)
def close(self):
"""Close current connection.
"""
return CTaosInterface.close(self._conn)
def subscribe(self, restart, topic, sql, interval):
"""Create a subscription.
"""
if self._conn is None:
return None
sub = CTaosInterface.subscribe(
self._conn, restart, topic, sql, interval)
return TDengineSubscription(sub)
def cursor(self):
"""Return a new Cursor object using the connection.
"""
return TDengineCursor(self)
def commit(self):
"""Commit any pending transaction to the database.
Since TDengine do not support transactions, the implement is void functionality.
"""
pass
def rollback(self):
"""Void functionality
"""
pass
def clear_result_set(self):
"""Clear unused result set on this connection.
"""
pass
if __name__ == "__main__":
conn = TDengineConnection(host='192.168.1.107')
conn.close()
print("Hello world")
"""Constants in TDengine python
"""
from .dbapi import *
class FieldType(object):
"""TDengine Field Types
"""
# type_code
C_NULL = 0
C_BOOL = 1
C_TINYINT = 2
C_SMALLINT = 3
C_INT = 4
C_BIGINT = 5
C_FLOAT = 6
C_DOUBLE = 7
C_BINARY = 8
C_TIMESTAMP = 9
C_NCHAR = 10
C_TINYINT_UNSIGNED = 11
C_SMALLINT_UNSIGNED = 12
C_INT_UNSIGNED = 13
C_BIGINT_UNSIGNED = 14
# NULL value definition
# NOTE: These values should change according to C definition in tsdb.h
C_BOOL_NULL = 0x02
C_TINYINT_NULL = -128
C_TINYINT_UNSIGNED_NULL = 255
C_SMALLINT_NULL = -32768
C_SMALLINT_UNSIGNED_NULL = 65535
C_INT_NULL = -2147483648
C_INT_UNSIGNED_NULL = 4294967295
C_BIGINT_NULL = -9223372036854775808
C_BIGINT_UNSIGNED_NULL = 18446744073709551615
C_FLOAT_NULL = float('nan')
C_DOUBLE_NULL = float('nan')
C_BINARY_NULL = bytearray([int('0xff', 16)])
# Time precision definition
C_TIMESTAMP_MILLI = 0
C_TIMESTAMP_MICRO = 1
from .cinterface import CTaosInterface
from .error import *
from .constants import FieldType
# querySeqNum = 0
class TDengineCursor(object):
"""Database cursor which is used to manage the context of a fetch operation.
Attributes:
.description: Read-only attribute consists of 7-item sequences:
> name (mondatory)
> type_code (mondatory)
> display_size
> internal_size
> precision
> scale
> null_ok
This attribute will be None for operations that do not return rows or
if the cursor has not had an operation invoked via the .execute*() method yet.
.rowcount:This read-only attribute specifies the number of rows that the last
.execute*() produced (for DQL statements like SELECT) or affected
"""
def __init__(self, connection=None):
self._description = []
self._rowcount = -1
self._connection = None
self._result = None
self._fields = None
self._block = None
self._block_rows = -1
self._block_iter = 0
self._affected_rows = 0
self._logfile = ""
if connection is not None:
self._connection = connection
def __iter__(self):
return self
def __next__(self):
if self._result is None or self._fields is None:
raise OperationalError("Invalid use of fetch iterator")
if self._block_rows <= self._block_iter:
block, self._block_rows = CTaosInterface.fetchRow(
self._result, self._fields)
if self._block_rows == 0:
raise StopIteration
self._block = list(map(tuple, zip(*block)))
self._block_iter = 0
data = self._block[self._block_iter]
self._block_iter += 1
return data
@property
def description(self):
"""Return the description of the object.
"""
return self._description
@property
def rowcount(self):
"""Return the rowcount of the object
"""
return self._rowcount
@property
def affected_rows(self):
"""Return the affected_rows of the object
"""
return self._affected_rows
def callproc(self, procname, *args):
"""Call a stored database procedure with the given name.
Void functionality since no stored procedures.
"""
pass
def close(self):
"""Close the cursor.
"""
if self._connection is None:
return False
self._reset_result()
self._connection = None
return True
def execute(self, operation, params=None):
"""Prepare and execute a database operation (query or command).
"""
if not operation:
return None
if not self._connection:
# TODO : change the exception raised here
raise ProgrammingError("Cursor is not connected")
self._reset_result()
stmt = operation
if params is not None:
pass
self._result = CTaosInterface.query(self._connection._conn, stmt)
errno = CTaosInterface.libtaos.taos_errno(self._result)
if errno == 0:
if CTaosInterface.fieldsCount(self._result) == 0:
self._affected_rows += CTaosInterface.affectedRows(
self._result)
return CTaosInterface.affectedRows(self._result)
else:
self._fields = CTaosInterface.useResult(self._result)
return self._handle_result()
else:
raise ProgrammingError(CTaosInterface.errStr(self._result), errno)
def executemany(self, operation, seq_of_parameters):
"""Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters.
"""
pass
def fetchone(self):
"""Fetch the next row of a query result set, returning a single sequence, or None when no more data is available.
"""
pass
def fetchmany(self):
pass
def fetchall_row(self):
"""Fetch all (remaining) rows of a query result, returning them as a sequence of sequences (e.g. a list of tuples). Note that the cursor's arraysize attribute can affect the performance of this operation.
"""
if self._result is None or self._fields is None:
raise OperationalError("Invalid use of fetchall")
buffer = [[] for i in range(len(self._fields))]
self._rowcount = 0
while True:
block, num_of_fields = CTaosInterface.fetchRow(
self._result, self._fields)
errno = CTaosInterface.libtaos.taos_errno(self._result)
if errno != 0:
raise ProgrammingError(
CTaosInterface.errStr(
self._result), errno)
if num_of_fields == 0:
break
self._rowcount += num_of_fields
for i in range(len(self._fields)):
buffer[i].extend(block[i])
return list(map(tuple, zip(*buffer)))
def fetchall(self):
if self._result is None or self._fields is None:
raise OperationalError("Invalid use of fetchall")
buffer = [[] for i in range(len(self._fields))]
self._rowcount = 0
while True:
block, num_of_fields = CTaosInterface.fetchBlock(
self._result, self._fields)
errno = CTaosInterface.libtaos.taos_errno(self._result)
if errno != 0:
raise ProgrammingError(
CTaosInterface.errStr(
self._result), errno)
if num_of_fields == 0:
break
self._rowcount += num_of_fields
for i in range(len(self._fields)):
buffer[i].extend(block[i])
return list(map(tuple, zip(*buffer)))
def nextset(self):
"""
"""
pass
def setinputsize(self, sizes):
pass
def setutputsize(self, size, column=None):
pass
def _reset_result(self):
"""Reset the result to unused version.
"""
self._description = []
self._rowcount = -1
if self._result is not None:
CTaosInterface.freeResult(self._result)
self._result = None
self._fields = None
self._block = None
self._block_rows = -1
self._block_iter = 0
self._affected_rows = 0
def _handle_result(self):
"""Handle the return result from query.
"""
self._description = []
for ele in self._fields:
self._description.append(
(ele['name'], ele['type'], None, None, None, None, False))
return self._result
"""Type Objects and Constructors.
"""
import time
import datetime
class DBAPITypeObject(object):
def __init__(self, *values):
self.values = values
def __com__(self, other):
if other in self.values:
return 0
if other < self.values:
return 1
else:
return -1
Date = datetime.date
Time = datetime.time
Timestamp = datetime.datetime
def DataFromTicks(ticks):
return Date(*time.localtime(ticks)[:3])
def TimeFromTicks(ticks):
return Time(*time.localtime(ticks)[3:6])
def TimestampFromTicks(ticks):
return Timestamp(*time.localtime(ticks)[:6])
Binary = bytes
# STRING = DBAPITypeObject(*constants.FieldType.get_string_types())
# BINARY = DBAPITypeObject(*constants.FieldType.get_binary_types())
# NUMBER = BAPITypeObject(*constants.FieldType.get_number_types())
# DATETIME = DBAPITypeObject(*constants.FieldType.get_timestamp_types())
# ROWID = DBAPITypeObject()
"""Python exceptions
"""
class Error(Exception):
def __init__(self, msg=None, errno=None):
self.msg = msg
self._full_msg = self.msg
self.errno = errno
def __str__(self):
return self._full_msg
class Warning(Exception):
"""Exception raised for important warnings like data truncations while inserting.
"""
pass
class InterfaceError(Error):
"""Exception raised for errors that are related to the database interface rather than the database itself.
"""
pass
class DatabaseError(Error):
"""Exception raised for errors that are related to the database.
"""
pass
class DataError(DatabaseError):
"""Exception raised for errors that are due to problems with the processed data like division by zero, numeric value out of range.
"""
pass
class OperationalError(DatabaseError):
"""Exception raised for errors that are related to the database's operation and not necessarily under the control of the programmer
"""
pass
class IntegrityError(DatabaseError):
"""Exception raised when the relational integrity of the database is affected.
"""
pass
class InternalError(DatabaseError):
"""Exception raised when the database encounters an internal error.
"""
pass
class ProgrammingError(DatabaseError):
"""Exception raised for programming errors.
"""
pass
class NotSupportedError(DatabaseError):
"""Exception raised in case a method or database API was used which is not supported by the database,.
"""
pass
from .cinterface import CTaosInterface
from .error import *
class TDengineSubscription(object):
"""TDengine subscription object
"""
def __init__(self, sub):
self._sub = sub
def consume(self):
"""Consume rows of a subscription
"""
if self._sub is None:
raise OperationalError("Invalid use of consume")
result, fields = CTaosInterface.consume(self._sub)
buffer = [[] for i in range(len(fields))]
while True:
block, num_of_fields = CTaosInterface.fetchBlock(result, fields)
if num_of_fields == 0:
break
for i in range(len(fields)):
buffer[i].extend(block[i])
self.fields = fields
return list(map(tuple, zip(*buffer)))
def close(self, keepProgress=True):
"""Close the Subscription.
"""
if self._sub is None:
return False
CTaosInterface.unsubscribe(self._sub, keepProgress)
return True
if __name__ == '__main__':
from .connection import TDengineConnection
conn = TDengineConnection(
host="127.0.0.1",
user="root",
password="taosdata",
database="test")
# Generate a cursor object to run SQL commands
sub = conn.subscribe(True, "test", "select * from meters;", 1000)
for i in range(0, 10):
data = sub.consume()
for d in data:
print(d)
sub.close()
conn.close()
../
\ No newline at end of file
Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
This program is free software: you can use, redistribute, and/or modify
it under the terms of the GNU Affero General Public License, version 3
or later ("AGPL"), as published by the Free Software Foundation.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
# TDengine python client interface
\ No newline at end of file
import setuptools
with open("README.md", "r") as fh:
long_description = fh.read()
setuptools.setup(
name="taos",
version="2.0.9",
author="Taosdata Inc.",
author_email="support@taosdata.com",
description="TDengine python client package",
long_description=long_description,
long_description_content_type="text/markdown",
url="https://github.com/pypa/sampleproject",
packages=setuptools.find_packages(),
classifiers=[
"Programming Language :: Python :: 3",
"Operating System :: Windows",
],
)
from .connection import TDengineConnection
from .cursor import TDengineCursor
# Globals
threadsafety = 0
paramstyle = 'pyformat'
__all__ = ['connection', 'cursor']
def connect(*args, **kwargs):
""" Function to return a TDengine connector object
Current supporting keyword parameters:
@dsn: Data source name as string
@user: Username as string(optional)
@password: Password as string(optional)
@host: Hostname(optional)
@database: Database name(optional)
@rtype: TDengineConnector
"""
return TDengineConnection(*args, **kwargs)
from .cursor import TDengineCursor
from .subscription import TDengineSubscription
from .cinterface import CTaosInterface
class TDengineConnection(object):
""" TDengine connection object
"""
def __init__(self, *args, **kwargs):
self._conn = None
self._host = None
self._user = "root"
self._password = "taosdata"
self._database = None
self._port = 0
self._config = None
self._chandle = None
if len(kwargs) > 0:
self.config(**kwargs)
def config(self, **kwargs):
# host
if 'host' in kwargs:
self._host = kwargs['host']
# user
if 'user' in kwargs:
self._user = kwargs['user']
# password
if 'password' in kwargs:
self._password = kwargs['password']
# database
if 'database' in kwargs:
self._database = kwargs['database']
# port
if 'port' in kwargs:
self._port = kwargs['port']
# config
if 'config' in kwargs:
self._config = kwargs['config']
self._chandle = CTaosInterface(self._config)
self._conn = self._chandle.connect(
self._host,
self._user,
self._password,
self._database,
self._port)
def close(self):
"""Close current connection.
"""
return CTaosInterface.close(self._conn)
def subscribe(self, restart, topic, sql, interval):
"""Create a subscription.
"""
if self._conn is None:
return None
sub = CTaosInterface.subscribe(
self._conn, restart, topic, sql, interval)
return TDengineSubscription(sub)
def cursor(self):
"""Return a new Cursor object using the connection.
"""
return TDengineCursor(self)
def commit(self):
"""Commit any pending transaction to the database.
Since TDengine do not support transactions, the implement is void functionality.
"""
pass
def rollback(self):
"""Void functionality
"""
pass
def clear_result_set(self):
"""Clear unused result set on this connection.
"""
pass
if __name__ == "__main__":
conn = TDengineConnection(host='192.168.1.107')
conn.close()
print("Hello world")
"""Constants in TDengine python
"""
from .dbapi import *
class FieldType(object):
"""TDengine Field Types
"""
# type_code
C_NULL = 0
C_BOOL = 1
C_TINYINT = 2
C_SMALLINT = 3
C_INT = 4
C_BIGINT = 5
C_FLOAT = 6
C_DOUBLE = 7
C_BINARY = 8
C_TIMESTAMP = 9
C_NCHAR = 10
C_TINYINT_UNSIGNED = 11
C_SMALLINT_UNSIGNED = 12
C_INT_UNSIGNED = 13
C_BIGINT_UNSIGNED = 14
# NULL value definition
# NOTE: These values should change according to C definition in tsdb.h
C_BOOL_NULL = 0x02
C_TINYINT_NULL = -128
C_TINYINT_UNSIGNED_NULL = 255
C_SMALLINT_NULL = -32768
C_SMALLINT_UNSIGNED_NULL = 65535
C_INT_NULL = -2147483648
C_INT_UNSIGNED_NULL = 4294967295
C_BIGINT_NULL = -9223372036854775808
C_BIGINT_UNSIGNED_NULL = 18446744073709551615
C_FLOAT_NULL = float('nan')
C_DOUBLE_NULL = float('nan')
C_BINARY_NULL = bytearray([int('0xff', 16)])
# Timestamp precision definition
C_TIMESTAMP_MILLI = 0
C_TIMESTAMP_MICRO = 1
from .cinterface import CTaosInterface
from .error import *
from .constants import FieldType
# querySeqNum = 0
class TDengineCursor(object):
"""Database cursor which is used to manage the context of a fetch operation.
Attributes:
.description: Read-only attribute consists of 7-item sequences:
> name (mondatory)
> type_code (mondatory)
> display_size
> internal_size
> precision
> scale
> null_ok
This attribute will be None for operations that do not return rows or
if the cursor has not had an operation invoked via the .execute*() method yet.
.rowcount:This read-only attribute specifies the number of rows that the last
.execute*() produced (for DQL statements like SELECT) or affected
"""
def __init__(self, connection=None):
self._description = []
self._rowcount = -1
self._connection = None
self._result = None
self._fields = None
self._block = None
self._block_rows = -1
self._block_iter = 0
self._affected_rows = 0
self._logfile = ""
if connection is not None:
self._connection = connection
def __iter__(self):
return self
def __next__(self):
if self._result is None or self._fields is None:
raise OperationalError("Invalid use of fetch iterator")
if self._block_rows <= self._block_iter:
block, self._block_rows = CTaosInterface.fetchRow(
self._result, self._fields)
if self._block_rows == 0:
raise StopIteration
self._block = list(map(tuple, zip(*block)))
self._block_iter = 0
data = self._block[self._block_iter]
self._block_iter += 1
return data
@property
def description(self):
"""Return the description of the object.
"""
return self._description
@property
def rowcount(self):
"""Return the rowcount of the object
"""
return self._rowcount
@property
def affected_rows(self):
"""Return the affected_rows of the object
"""
return self._affected_rows
def callproc(self, procname, *args):
"""Call a stored database procedure with the given name.
Void functionality since no stored procedures.
"""
pass
def close(self):
"""Close the cursor.
"""
if self._connection is None:
return False
self._reset_result()
self._connection = None
return True
def execute(self, operation, params=None):
"""Prepare and execute a database operation (query or command).
"""
if not operation:
return None
if not self._connection:
# TODO : change the exception raised here
raise ProgrammingError("Cursor is not connected")
self._reset_result()
stmt = operation
if params is not None:
pass
self._result = CTaosInterface.query(self._connection._conn, stmt)
errno = CTaosInterface.libtaos.taos_errno(self._result)
if errno == 0:
if CTaosInterface.fieldsCount(self._result) == 0:
self._affected_rows += CTaosInterface.affectedRows(
self._result)
return CTaosInterface.affectedRows(self._result)
else:
self._fields = CTaosInterface.useResult(self._result)
return self._handle_result()
else:
raise ProgrammingError(CTaosInterface.errStr(self._result), errno)
def executemany(self, operation, seq_of_parameters):
"""Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters.
"""
pass
def fetchone(self):
"""Fetch the next row of a query result set, returning a single sequence, or None when no more data is available.
"""
pass
def fetchmany(self):
pass
def fetchall_row(self):
"""Fetch all (remaining) rows of a query result, returning them as a sequence of sequences (e.g. a list of tuples). Note that the cursor's arraysize attribute can affect the performance of this operation.
"""
if self._result is None or self._fields is None:
raise OperationalError("Invalid use of fetchall")
buffer = [[] for i in range(len(self._fields))]
self._rowcount = 0
while True:
block, num_of_fields = CTaosInterface.fetchRow(
self._result, self._fields)
errno = CTaosInterface.libtaos.taos_errno(self._result)
if errno != 0:
raise ProgrammingError(
CTaosInterface.errStr(
self._result), errno)
if num_of_fields == 0:
break
self._rowcount += num_of_fields
for i in range(len(self._fields)):
buffer[i].extend(block[i])
return list(map(tuple, zip(*buffer)))
def fetchall(self):
if self._result is None or self._fields is None:
raise OperationalError("Invalid use of fetchall")
buffer = [[] for i in range(len(self._fields))]
self._rowcount = 0
while True:
block, num_of_fields = CTaosInterface.fetchBlock(
self._result, self._fields)
errno = CTaosInterface.libtaos.taos_errno(self._result)
if errno != 0:
raise ProgrammingError(
CTaosInterface.errStr(
self._result), errno)
if num_of_fields == 0:
break
self._rowcount += num_of_fields
for i in range(len(self._fields)):
buffer[i].extend(block[i])
return list(map(tuple, zip(*buffer)))
def nextset(self):
"""
"""
pass
def setinputsize(self, sizes):
pass
def setutputsize(self, size, column=None):
pass
def _reset_result(self):
"""Reset the result to unused version.
"""
self._description = []
self._rowcount = -1
if self._result is not None:
CTaosInterface.freeResult(self._result)
self._result = None
self._fields = None
self._block = None
self._block_rows = -1
self._block_iter = 0
self._affected_rows = 0
def _handle_result(self):
"""Handle the return result from query.
"""
self._description = []
for ele in self._fields:
self._description.append(
(ele['name'], ele['type'], None, None, None, None, False))
return self._result
"""Type Objects and Constructors.
"""
import time
import datetime
class DBAPITypeObject(object):
def __init__(self, *values):
self.values = values
def __com__(self, other):
if other in self.values:
return 0
if other < self.values:
return 1
else:
return -1
Date = datetime.date
Time = datetime.time
Timestamp = datetime.datetime
def DataFromTicks(ticks):
return Date(*time.localtime(ticks)[:3])
def TimeFromTicks(ticks):
return Time(*time.localtime(ticks)[3:6])
def TimestampFromTicks(ticks):
return Timestamp(*time.localtime(ticks)[:6])
Binary = bytes
# STRING = DBAPITypeObject(*constants.FieldType.get_string_types())
# BINARY = DBAPITypeObject(*constants.FieldType.get_binary_types())
# NUMBER = BAPITypeObject(*constants.FieldType.get_number_types())
# DATETIME = DBAPITypeObject(*constants.FieldType.get_timestamp_types())
# ROWID = DBAPITypeObject()
"""Python exceptions
"""
class Error(Exception):
def __init__(self, msg=None, errno=None):
self.msg = msg
self._full_msg = self.msg
self.errno = errno
def __str__(self):
return self._full_msg
class Warning(Exception):
"""Exception raised for important warnings like data truncations while inserting.
"""
pass
class InterfaceError(Error):
"""Exception raised for errors that are related to the database interface rather than the database itself.
"""
pass
class DatabaseError(Error):
"""Exception raised for errors that are related to the database.
"""
pass
class DataError(DatabaseError):
"""Exception raised for errors that are due to problems with the processed data like division by zero, numeric value out of range.
"""
pass
class OperationalError(DatabaseError):
"""Exception raised for errors that are related to the database's operation and not necessarily under the control of the programmer
"""
pass
class IntegrityError(DatabaseError):
"""Exception raised when the relational integrity of the database is affected.
"""
pass
class InternalError(DatabaseError):
"""Exception raised when the database encounters an internal error.
"""
pass
class ProgrammingError(DatabaseError):
"""Exception raised for programming errors.
"""
pass
class NotSupportedError(DatabaseError):
"""Exception raised in case a method or database API was used which is not supported by the database,.
"""
pass
from .cinterface import CTaosInterface
from .error import *
class TDengineSubscription(object):
"""TDengine subscription object
"""
def __init__(self, sub):
self._sub = sub
def consume(self):
"""Consume rows of a subscription
"""
if self._sub is None:
raise OperationalError("Invalid use of consume")
result, fields = CTaosInterface.consume(self._sub)
buffer = [[] for i in range(len(fields))]
while True:
block, num_of_fields = CTaosInterface.fetchBlock(result, fields)
if num_of_fields == 0:
break
for i in range(len(fields)):
buffer[i].extend(block[i])
self.fields = fields
return list(map(tuple, zip(*buffer)))
def close(self, keepProgress=True):
"""Close the Subscription.
"""
if self._sub is None:
return False
CTaosInterface.unsubscribe(self._sub, keepProgress)
return True
if __name__ == '__main__':
from .connection import TDengineConnection
conn = TDengineConnection(
host="127.0.0.1",
user="root",
password="taosdata",
database="test")
# Generate a cursor object to run SQL commands
sub = conn.subscribe(True, "test", "select * from meters;", 1000)
for i in range(0, 10):
data = sub.consume()
for d in data:
print(d)
sub.close()
conn.close()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册