提交 e7ad161a 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/master' into hotfix/crashgen

......@@ -94,7 +94,7 @@ def pre_test(){
make > /dev/null
make install > /dev/null
cd ${WKC}/tests
pip3 install ${WKC}/src/connector/python/linux/python3/
pip3 install ${WKC}/src/connector/python/
'''
return 1
}
......
......@@ -399,27 +399,22 @@ Python连接器的使用参见[视频教程](https://www.taosdata.com/blog/2020/
#### Linux
用户可以在源代码的src/connector/python(或者tar.gz的/connector/python)文件夹下找到python2和python3的connector安装包。用户可以通过pip命令安装:
用户可以在源代码的src/connector/python(或者tar.gz的/connector/python)文件夹下找到connector安装包。用户可以通过pip命令安装:
`pip install src/connector/python/linux/python2/`
`pip install src/connector/python/`
`pip3 install src/connector/python/linux/python3/`
`pip3 install src/connector/python/`
#### Windows
在已安装Windows TDengine 客户端的情况下, 将文件"C:\TDengine\driver\taos.dll" 拷贝到 "C:\windows\system32" 目录下, 然后进入Windwos <em>cmd</em> 命令行界面
```cmd
cd C:\TDengine\connector\python\windows
python -m pip install python2\
```
```cmd
cd C:\TDengine\connector\python\windows
python -m pip install python3\
cd C:\TDengine\connector\python
python -m pip install .
```
* 如果机器上没有pip命令,用户可将src/connector/python/python3或src/connector/python/python2下的taos文件夹拷贝到应用程序的目录使用。
* 如果机器上没有pip命令,用户可将src/connector/python下的taos文件夹拷贝到应用程序的目录使用。
对于windows 客户端,安装TDengine windows 客户端后,将C:\TDengine\driver\taos.dll拷贝到C:\windows\system32目录下即可。
### 使用
......
......@@ -156,20 +156,11 @@ if [[ "$pagMode" != "lite" ]] && [[ "$cpuType" != "aarch32" ]]; then
fi
cp -r ${connector_dir}/python ${install_dir}/connector
sed -i '/password/ {s/taosdata/powerdb/g}' ${install_dir}/connector/python/linux/python2/taos/cinterface.py
sed -i '/password/ {s/taosdata/powerdb/g}' ${install_dir}/connector/python/linux/python3/taos/cinterface.py
sed -i '/password/ {s/taosdata/powerdb/g}' ${install_dir}/connector/python/windows/python2/taos/cinterface.py
sed -i '/password/ {s/taosdata/powerdb/g}' ${install_dir}/connector/python/windows/python3/taos/cinterface.py
sed -i '/password/ {s/taosdata/powerdb/g}' ${install_dir}/connector/python/taos/cinterface.py
sed -i '/password/ {s/taosdata/powerdb/g}' ${install_dir}/connector/python/linux/python2/taos/subscription.py
sed -i '/password/ {s/taosdata/powerdb/g}' ${install_dir}/connector/python/linux/python3/taos/subscription.py
sed -i '/password/ {s/taosdata/powerdb/g}' ${install_dir}/connector/python/windows/python2/taos/subscription.py
sed -i '/password/ {s/taosdata/powerdb/g}' ${install_dir}/connector/python/windows/python3/taos/subscription.py
sed -i '/password/ {s/taosdata/powerdb/g}' ${install_dir}/connector/python/taos/subscription.py
sed -i '/self._password/ {s/taosdata/powerdb/g}' ${install_dir}/connector/python/linux/python2/taos/connection.py
sed -i '/self._password/ {s/taosdata/powerdb/g}' ${install_dir}/connector/python/linux/python3/taos/connection.py
sed -i '/self._password/ {s/taosdata/powerdb/g}' ${install_dir}/connector/python/windows/python2/taos/connection.py
sed -i '/self._password/ {s/taosdata/powerdb/g}' ${install_dir}/connector/python/windows/python3/taos/connection.py
sed -i '/self._password/ {s/taosdata/powerdb/g}' ${install_dir}/connector/python/taos/connection.py
fi
# Copy release note
# cp ${script_dir}/release_note ${install_dir}
......
......@@ -179,20 +179,11 @@ if [[ "$pagMode" != "lite" ]] && [[ "$cpuType" != "aarch32" ]]; then
fi
cp -r ${connector_dir}/python ${install_dir}/connector/
sed -i '/password/ {s/taosdata/powerdb/g}' ${install_dir}/connector/python/linux/python2/taos/cinterface.py
sed -i '/password/ {s/taosdata/powerdb/g}' ${install_dir}/connector/python/linux/python3/taos/cinterface.py
sed -i '/password/ {s/taosdata/powerdb/g}' ${install_dir}/connector/python/windows/python2/taos/cinterface.py
sed -i '/password/ {s/taosdata/powerdb/g}' ${install_dir}/connector/python/windows/python3/taos/cinterface.py
sed -i '/password/ {s/taosdata/powerdb/g}' ${install_dir}/connector/python/taos/cinterface.py
sed -i '/password/ {s/taosdata/powerdb/g}' ${install_dir}/connector/python/linux/python2/taos/subscription.py
sed -i '/password/ {s/taosdata/powerdb/g}' ${install_dir}/connector/python/linux/python3/taos/subscription.py
sed -i '/password/ {s/taosdata/powerdb/g}' ${install_dir}/connector/python/windows/python2/taos/subscription.py
sed -i '/password/ {s/taosdata/powerdb/g}' ${install_dir}/connector/python/windows/python3/taos/subscription.py
sed -i '/password/ {s/taosdata/powerdb/g}' ${install_dir}/connector/python/taos/subscription.py
sed -i '/self._password/ {s/taosdata/powerdb/g}' ${install_dir}/connector/python/linux/python2/taos/connection.py
sed -i '/self._password/ {s/taosdata/powerdb/g}' ${install_dir}/connector/python/linux/python3/taos/connection.py
sed -i '/self._password/ {s/taosdata/powerdb/g}' ${install_dir}/connector/python/windows/python2/taos/connection.py
sed -i '/self._password/ {s/taosdata/powerdb/g}' ${install_dir}/connector/python/windows/python3/taos/connection.py
sed -i '/self._password/ {s/taosdata/powerdb/g}' ${install_dir}/connector/python/taos/connection.py
fi
# Copy release note
# cp ${script_dir}/release_note ${install_dir}
......
......@@ -46,7 +46,7 @@ char tsEmail[TSDB_FQDN_LEN] = {0};
int32_t tsDnodeId = 0;
// common
int32_t tsRpcTimer = 1000;
int32_t tsRpcTimer = 300;
int32_t tsRpcMaxTime = 600; // seconds;
int32_t tsMaxShellConns = 50000;
int32_t tsMaxConnections = 5000;
......
# Created by https://www.toptal.com/developers/gitignore/api/python
# Edit at https://www.toptal.com/developers/gitignore?templates=python
### Python ###
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
pytestdebug.log
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
doc/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
#poetry.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
# .env
.env/
.venv/
env/
venv/
ENV/
env.bak/
venv.bak/
pythonenv*
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# operating system-related files
# file properties cache/storage on macOS
*.DS_Store
# thumbnail cache on Windows
Thumbs.db
# profiling data
.prof
# End of https://www.toptal.com/developers/gitignore/api/python
# TDengine Connector for Python
[TDengine] connector for Python enables python programs to access TDengine, using an API which is compliant with the Python DB API 2.0 (PEP-249). It uses TDengine C client library for client server communications.
## Install
```sh
pip install git+https://github.com/taosdata/TDengine-connector-python
```
## Source Code
[TDengine] connector for Python source code is hosted on [GitHub](https://github.com/taosdata/TDengine-connector-python).
## License - AGPL
Keep same with [TDengine](https://github.com/taosdata/TDengine).
import taos
conn = taos.connect(host='127.0.0.1',
user='root',
passworkd='taodata',
database='log')
cursor = conn.cursor()
sql = "select * from log.log limit 10"
cursor.execute(sql)
for row in cursor:
print(row)
# TDengine python client interface
\ No newline at end of file
import setuptools
with open("README.md", "r") as fh:
long_description = fh.read()
setuptools.setup(
name="taos",
version="2.0.9",
author="Taosdata Inc.",
author_email="support@taosdata.com",
description="TDengine python client package",
long_description=long_description,
long_description_content_type="text/markdown",
url="https://github.com/pypa/sampleproject",
packages=setuptools.find_packages(),
classifiers=[
"Programming Language :: Python :: 2",
"Operating System :: Linux",
],
)
from .cinterface import CTaosInterface
from .error import *
from .constants import FieldType
class TDengineCursor(object):
"""Database cursor which is used to manage the context of a fetch operation.
Attributes:
.description: Read-only attribute consists of 7-item sequences:
> name (mondatory)
> type_code (mondatory)
> display_size
> internal_size
> precision
> scale
> null_ok
This attribute will be None for operations that do not return rows or
if the cursor has not had an operation invoked via the .execute*() method yet.
.rowcount:This read-only attribute specifies the number of rows that the last
.execute*() produced (for DQL statements like SELECT) or affected
"""
def __init__(self, connection=None):
self._description = []
self._rowcount = -1
self._connection = None
self._result = None
self._fields = None
self._block = None
self._block_rows = -1
self._block_iter = 0
self._affected_rows = 0
self._logfile = ""
if connection is not None:
self._connection = connection
def __iter__(self):
return self
def next(self):
if self._result is None or self._fields is None:
raise OperationalError("Invalid use of fetch iterator")
if self._block_rows <= self._block_iter:
block, self._block_rows = CTaosInterface.fetchRow(
self._result, self._fields)
if self._block_rows == 0:
raise StopIteration
self._block = list(map(tuple, zip(*block)))
self._block_iter = 0
data = self._block[self._block_iter]
self._block_iter += 1
return data
@property
def description(self):
"""Return the description of the object.
"""
return self._description
@property
def rowcount(self):
"""Return the rowcount of the object
"""
return self._rowcount
@property
def affected_rows(self):
"""Return the affected_rows of the object
"""
return self._affected_rows
def callproc(self, procname, *args):
"""Call a stored database procedure with the given name.
Void functionality since no stored procedures.
"""
pass
def log(self, logfile):
self._logfile = logfile
def close(self):
"""Close the cursor.
"""
if self._connection is None:
return False
self._reset_result()
self._connection = None
return True
def execute(self, operation, params=None):
"""Prepare and execute a database operation (query or command).
"""
if not operation:
return None
if not self._connection:
# TODO : change the exception raised here
raise ProgrammingError("Cursor is not connected")
self._reset_result()
stmt = operation
if params is not None:
pass
# global querySeqNum
# querySeqNum += 1
# localSeqNum = querySeqNum # avoid raice condition
# print(" >> Exec Query ({}): {}".format(localSeqNum, str(stmt)))
self._result = CTaosInterface.query(self._connection._conn, stmt)
# print(" << Query ({}) Exec Done".format(localSeqNum))
if (self._logfile):
with open(self._logfile, "a") as logfile:
logfile.write("%s;\n" % operation)
errno = CTaosInterface.libtaos.taos_errno(self._result)
if errno == 0:
if CTaosInterface.fieldsCount(self._result) == 0:
self._affected_rows += CTaosInterface.affectedRows(
self._result)
return CTaosInterface.affectedRows(self._result)
else:
self._fields = CTaosInterface.useResult(
self._result)
return self._handle_result()
else:
raise ProgrammingError(
CTaosInterface.errStr(
self._result), errno)
def executemany(self, operation, seq_of_parameters):
"""Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters.
"""
pass
def fetchone(self):
"""Fetch the next row of a query result set, returning a single sequence, or None when no more data is available.
"""
pass
def fetchmany(self):
pass
def istype(self, col, dataType):
if (dataType.upper() == "BOOL"):
if (self._description[col][1] == FieldType.C_BOOL):
return True
if (dataType.upper() == "TINYINT"):
if (self._description[col][1] == FieldType.C_TINYINT):
return True
if (dataType.upper() == "TINYINT UNSIGNED"):
if (self._description[col][1] == FieldType.C_TINYINT_UNSIGNED):
return True
if (dataType.upper() == "SMALLINT"):
if (self._description[col][1] == FieldType.C_SMALLINT):
return True
if (dataType.upper() == "SMALLINT UNSIGNED"):
if (self._description[col][1] == FieldType.C_SMALLINT_UNSIGNED):
return True
if (dataType.upper() == "INT"):
if (self._description[col][1] == FieldType.C_INT):
return True
if (dataType.upper() == "INT UNSIGNED"):
if (self._description[col][1] == FieldType.C_INT_UNSIGNED):
return True
if (dataType.upper() == "BIGINT"):
if (self._description[col][1] == FieldType.C_BIGINT):
return True
if (dataType.upper() == "BIGINT UNSIGNED"):
if (self._description[col][1] == FieldType.C_BIGINT_UNSIGNED):
return True
if (dataType.upper() == "FLOAT"):
if (self._description[col][1] == FieldType.C_FLOAT):
return True
if (dataType.upper() == "DOUBLE"):
if (self._description[col][1] == FieldType.C_DOUBLE):
return True
if (dataType.upper() == "BINARY"):
if (self._description[col][1] == FieldType.C_BINARY):
return True
if (dataType.upper() == "TIMESTAMP"):
if (self._description[col][1] == FieldType.C_TIMESTAMP):
return True
if (dataType.upper() == "NCHAR"):
if (self._description[col][1] == FieldType.C_NCHAR):
return True
return False
def fetchall_row(self):
"""Fetch all (remaining) rows of a query result, returning them as a sequence of sequences (e.g. a list of tuples). Note that the cursor's arraysize attribute can affect the performance of this operation.
"""
if self._result is None or self._fields is None:
raise OperationalError("Invalid use of fetchall")
buffer = [[] for i in range(len(self._fields))]
self._rowcount = 0
while True:
block, num_of_fields = CTaosInterface.fetchRow(
self._result, self._fields)
errno = CTaosInterface.libtaos.taos_errno(self._result)
if errno != 0:
raise ProgrammingError(
CTaosInterface.errStr(
self._result), errno)
if num_of_fields == 0:
break
self._rowcount += num_of_fields
for i in range(len(self._fields)):
buffer[i].extend(block[i])
return list(map(tuple, zip(*buffer)))
def fetchall(self):
if self._result is None or self._fields is None:
raise OperationalError("Invalid use of fetchall")
buffer = [[] for i in range(len(self._fields))]
self._rowcount = 0
while True:
block, num_of_fields = CTaosInterface.fetchBlock(
self._result, self._fields)
errno = CTaosInterface.libtaos.taos_errno(self._result)
if errno != 0:
raise ProgrammingError(
CTaosInterface.errStr(
self._result), errno)
if num_of_fields == 0:
break
self._rowcount += num_of_fields
for i in range(len(self._fields)):
buffer[i].extend(block[i])
return list(map(tuple, zip(*buffer)))
def nextset(self):
"""
"""
pass
def setinputsize(self, sizes):
pass
def setutputsize(self, size, column=None):
pass
def _reset_result(self):
"""Reset the result to unused version.
"""
self._description = []
self._rowcount = -1
if self._result is not None:
CTaosInterface.freeResult(self._result)
self._result = None
self._fields = None
self._block = None
self._block_rows = -1
self._block_iter = 0
self._affected_rows = 0
def _handle_result(self):
"""Handle the return result from query.
"""
self._description = []
for ele in self._fields:
self._description.append(
(ele['name'], ele['type'], None, None, None, None, False))
return self._result
Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
This program is free software: you can use, redistribute, and/or modify
it under the terms of the GNU Affero General Public License, version 3
or later ("AGPL"), as published by the Free Software Foundation.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
# TDengine python client interface
\ No newline at end of file
from .connection import TDengineConnection
from .cursor import TDengineCursor
from .error import Error
# Globals
threadsafety = 0
paramstyle = 'pyformat'
__all__ = ['connection', 'cursor']
def connect(*args, **kwargs):
""" Function to return a TDengine connector object
Current supporting keyword parameters:
@dsn: Data source name as string
@user: Username as string(optional)
@password: Password as string(optional)
@host: Hostname(optional)
@database: Database name(optional)
@rtype: TDengineConnector
"""
return TDengineConnection(*args, **kwargs)
from .cursor import TDengineCursor
from .subscription import TDengineSubscription
from .cinterface import CTaosInterface
class TDengineConnection(object):
""" TDengine connection object
"""
def __init__(self, *args, **kwargs):
self._conn = None
self._host = None
self._user = "root"
self._password = "taosdata"
self._database = None
self._port = 0
self._config = None
self._chandle = None
self.config(**kwargs)
def config(self, **kwargs):
# host
if 'host' in kwargs:
self._host = kwargs['host']
# user
if 'user' in kwargs:
self._user = kwargs['user']
# password
if 'password' in kwargs:
self._password = kwargs['password']
# database
if 'database' in kwargs:
self._database = kwargs['database']
# port
if 'port' in kwargs:
self._port = kwargs['port']
# config
if 'config' in kwargs:
self._config = kwargs['config']
self._chandle = CTaosInterface(self._config)
self._conn = self._chandle.connect(
self._host,
self._user,
self._password,
self._database,
self._port)
def close(self):
"""Close current connection.
"""
return CTaosInterface.close(self._conn)
def subscribe(self, restart, topic, sql, interval):
"""Create a subscription.
"""
if self._conn is None:
return None
sub = CTaosInterface.subscribe(
self._conn, restart, topic, sql, interval)
return TDengineSubscription(sub)
def cursor(self):
"""Return a new Cursor object using the connection.
"""
return TDengineCursor(self)
def commit(self):
"""Commit any pending transaction to the database.
Since TDengine do not support transactions, the implement is void functionality.
"""
pass
def rollback(self):
"""Void functionality
"""
pass
def clear_result_set(self):
"""Clear unused result set on this connection.
"""
pass
if __name__ == "__main__":
conn = TDengineConnection(host='192.168.1.107')
conn.close()
print("Hello world")
"""Constants in TDengine python
"""
from .dbapi import *
class FieldType(object):
"""TDengine Field Types
"""
# type_code
C_NULL = 0
C_BOOL = 1
C_TINYINT = 2
C_SMALLINT = 3
C_INT = 4
C_BIGINT = 5
C_FLOAT = 6
C_DOUBLE = 7
C_BINARY = 8
C_TIMESTAMP = 9
C_NCHAR = 10
C_TINYINT_UNSIGNED = 11
C_SMALLINT_UNSIGNED = 12
C_INT_UNSIGNED = 13
C_BIGINT_UNSIGNED = 14
# NULL value definition
# NOTE: These values should change according to C definition in tsdb.h
C_BOOL_NULL = 0x02
C_TINYINT_NULL = -128
C_TINYINT_UNSIGNED_NULL = 255
C_SMALLINT_NULL = -32768
C_SMALLINT_UNSIGNED_NULL = 65535
C_INT_NULL = -2147483648
C_INT_UNSIGNED_NULL = 4294967295
C_BIGINT_NULL = -9223372036854775808
C_BIGINT_UNSIGNED_NULL = 18446744073709551615
C_FLOAT_NULL = float('nan')
C_DOUBLE_NULL = float('nan')
C_BINARY_NULL = bytearray([int('0xff', 16)])
# Timestamp precision definition
C_TIMESTAMP_MILLI = 0
C_TIMESTAMP_MICRO = 1
from .cinterface import CTaosInterface
from .error import *
from .constants import FieldType
# querySeqNum = 0
class TDengineCursor(object):
"""Database cursor which is used to manage the context of a fetch operation.
Attributes:
.description: Read-only attribute consists of 7-item sequences:
> name (mondatory)
> type_code (mondatory)
> display_size
> internal_size
> precision
> scale
> null_ok
This attribute will be None for operations that do not return rows or
if the cursor has not had an operation invoked via the .execute*() method yet.
.rowcount:This read-only attribute specifies the number of rows that the last
.execute*() produced (for DQL statements like SELECT) or affected
"""
def __init__(self, connection=None):
self._description = []
self._rowcount = -1
self._connection = None
self._result = None
self._fields = None
self._block = None
self._block_rows = -1
self._block_iter = 0
self._affected_rows = 0
self._logfile = ""
if connection is not None:
self._connection = connection
def __iter__(self):
return self
def __next__(self):
if self._result is None or self._fields is None:
raise OperationalError("Invalid use of fetch iterator")
if self._block_rows <= self._block_iter:
block, self._block_rows = CTaosInterface.fetchRow(
self._result, self._fields)
if self._block_rows == 0:
raise StopIteration
self._block = list(map(tuple, zip(*block)))
self._block_iter = 0
data = self._block[self._block_iter]
self._block_iter += 1
return data
@property
def description(self):
"""Return the description of the object.
"""
return self._description
@property
def rowcount(self):
"""Return the rowcount of the object
"""
return self._rowcount
@property
def affected_rows(self):
"""Return the rowcount of insertion
"""
return self._affected_rows
def callproc(self, procname, *args):
"""Call a stored database procedure with the given name.
Void functionality since no stored procedures.
"""
pass
def log(self, logfile):
self._logfile = logfile
def close(self):
"""Close the cursor.
"""
if self._connection is None:
return False
self._reset_result()
self._connection = None
return True
def execute(self, operation, params=None):
"""Prepare and execute a database operation (query or command).
"""
if not operation:
return None
if not self._connection:
# TODO : change the exception raised here
raise ProgrammingError("Cursor is not connected")
self._reset_result()
stmt = operation
if params is not None:
pass
# global querySeqNum
# querySeqNum += 1
# localSeqNum = querySeqNum # avoid raice condition
# print(" >> Exec Query ({}): {}".format(localSeqNum, str(stmt)))
self._result = CTaosInterface.query(self._connection._conn, stmt)
# print(" << Query ({}) Exec Done".format(localSeqNum))
if (self._logfile):
with open(self._logfile, "a") as logfile:
logfile.write("%s;\n" % operation)
errno = CTaosInterface.libtaos.taos_errno(self._result)
if errno == 0:
if CTaosInterface.fieldsCount(self._result) == 0:
self._affected_rows += CTaosInterface.affectedRows(
self._result)
return CTaosInterface.affectedRows(self._result)
else:
self._fields = CTaosInterface.useResult(
self._result)
return self._handle_result()
else:
raise ProgrammingError(
CTaosInterface.errStr(
self._result), errno)
def executemany(self, operation, seq_of_parameters):
"""Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters.
"""
pass
def fetchone(self):
"""Fetch the next row of a query result set, returning a single sequence, or None when no more data is available.
"""
pass
def fetchmany(self):
pass
def istype(self, col, dataType):
if (dataType.upper() == "BOOL"):
if (self._description[col][1] == FieldType.C_BOOL):
return True
if (dataType.upper() == "TINYINT"):
if (self._description[col][1] == FieldType.C_TINYINT):
return True
if (dataType.upper() == "TINYINT UNSIGNED"):
if (self._description[col][1] == FieldType.C_TINYINT_UNSIGNED):
return True
if (dataType.upper() == "SMALLINT"):
if (self._description[col][1] == FieldType.C_SMALLINT):
return True
if (dataType.upper() == "SMALLINT UNSIGNED"):
if (self._description[col][1] == FieldType.C_SMALLINT_UNSIGNED):
return True
if (dataType.upper() == "INT"):
if (self._description[col][1] == FieldType.C_INT):
return True
if (dataType.upper() == "INT UNSIGNED"):
if (self._description[col][1] == FieldType.C_INT_UNSIGNED):
return True
if (dataType.upper() == "BIGINT"):
if (self._description[col][1] == FieldType.C_BIGINT):
return True
if (dataType.upper() == "BIGINT UNSIGNED"):
if (self._description[col][1] == FieldType.C_BIGINT_UNSIGNED):
return True
if (dataType.upper() == "FLOAT"):
if (self._description[col][1] == FieldType.C_FLOAT):
return True
if (dataType.upper() == "DOUBLE"):
if (self._description[col][1] == FieldType.C_DOUBLE):
return True
if (dataType.upper() == "BINARY"):
if (self._description[col][1] == FieldType.C_BINARY):
return True
if (dataType.upper() == "TIMESTAMP"):
if (self._description[col][1] == FieldType.C_TIMESTAMP):
return True
if (dataType.upper() == "NCHAR"):
if (self._description[col][1] == FieldType.C_NCHAR):
return True
return False
def fetchall_row(self):
"""Fetch all (remaining) rows of a query result, returning them as a sequence of sequences (e.g. a list of tuples). Note that the cursor's arraysize attribute can affect the performance of this operation.
"""
if self._result is None or self._fields is None:
raise OperationalError("Invalid use of fetchall")
buffer = [[] for i in range(len(self._fields))]
self._rowcount = 0
while True:
block, num_of_fields = CTaosInterface.fetchRow(
self._result, self._fields)
errno = CTaosInterface.libtaos.taos_errno(self._result)
if errno != 0:
raise ProgrammingError(
CTaosInterface.errStr(
self._result), errno)
if num_of_fields == 0:
break
self._rowcount += num_of_fields
for i in range(len(self._fields)):
buffer[i].extend(block[i])
return list(map(tuple, zip(*buffer)))
def fetchall(self):
if self._result is None or self._fields is None:
raise OperationalError("Invalid use of fetchall")
buffer = [[] for i in range(len(self._fields))]
self._rowcount = 0
while True:
block, num_of_fields = CTaosInterface.fetchBlock(
self._result, self._fields)
errno = CTaosInterface.libtaos.taos_errno(self._result)
if errno != 0:
raise ProgrammingError(
CTaosInterface.errStr(
self._result), errno)
if num_of_fields == 0:
break
self._rowcount += num_of_fields
for i in range(len(self._fields)):
buffer[i].extend(block[i])
return list(map(tuple, zip(*buffer)))
def nextset(self):
"""
"""
pass
def setinputsize(self, sizes):
pass
def setutputsize(self, size, column=None):
pass
def _reset_result(self):
"""Reset the result to unused version.
"""
self._description = []
self._rowcount = -1
if self._result is not None:
CTaosInterface.freeResult(self._result)
self._result = None
self._fields = None
self._block = None
self._block_rows = -1
self._block_iter = 0
self._affected_rows = 0
def _handle_result(self):
"""Handle the return result from query.
"""
self._description = []
for ele in self._fields:
self._description.append(
(ele['name'], ele['type'], None, None, None, None, False))
return self._result
"""Type Objects and Constructors.
"""
import time
import datetime
class DBAPITypeObject(object):
def __init__(self, *values):
self.values = values
def __com__(self, other):
if other in self.values:
return 0
if other < self.values:
return 1
else:
return -1
Date = datetime.date
Time = datetime.time
Timestamp = datetime.datetime
def DataFromTicks(ticks):
return Date(*time.localtime(ticks)[:3])
def TimeFromTicks(ticks):
return Time(*time.localtime(ticks)[3:6])
def TimestampFromTicks(ticks):
return Timestamp(*time.localtime(ticks)[:6])
Binary = bytes
# STRING = DBAPITypeObject(*constants.FieldType.get_string_types())
# BINARY = DBAPITypeObject(*constants.FieldType.get_binary_types())
# NUMBER = BAPITypeObject(*constants.FieldType.get_number_types())
# DATETIME = DBAPITypeObject(*constants.FieldType.get_timestamp_types())
# ROWID = DBAPITypeObject()
"""Python exceptions
"""
class Error(Exception):
def __init__(self, msg=None, errno=None):
self.msg = msg
self._full_msg = self.msg
self.errno = errno
def __str__(self):
return self._full_msg
class Warning(Exception):
"""Exception raised for important warnings like data truncations while inserting.
"""
pass
class InterfaceError(Error):
"""Exception raised for errors that are related to the database interface rather than the database itself.
"""
pass
class DatabaseError(Error):
"""Exception raised for errors that are related to the database.
"""
pass
class DataError(DatabaseError):
"""Exception raised for errors that are due to problems with the processed data like division by zero, numeric value out of range.
"""
pass
class OperationalError(DatabaseError):
"""Exception raised for errors that are related to the database's operation and not necessarily under the control of the programmer
"""
pass
class IntegrityError(DatabaseError):
"""Exception raised when the relational integrity of the database is affected.
"""
pass
class InternalError(DatabaseError):
"""Exception raised when the database encounters an internal error.
"""
pass
class ProgrammingError(DatabaseError):
"""Exception raised for programming errors.
"""
pass
class NotSupportedError(DatabaseError):
"""Exception raised in case a method or database API was used which is not supported by the database,.
"""
pass
from .cinterface import CTaosInterface
from .error import *
class TDengineSubscription(object):
"""TDengine subscription object
"""
def __init__(self, sub):
self._sub = sub
def consume(self):
"""Consume rows of a subscription
"""
if self._sub is None:
raise OperationalError("Invalid use of consume")
result, fields = CTaosInterface.consume(self._sub)
buffer = [[] for i in range(len(fields))]
while True:
block, num_of_fields = CTaosInterface.fetchBlock(result, fields)
if num_of_fields == 0:
break
for i in range(len(fields)):
buffer[i].extend(block[i])
self.fields = fields
return list(map(tuple, zip(*buffer)))
def close(self, keepProgress=True):
"""Close the Subscription.
"""
if self._sub is None:
return False
CTaosInterface.unsubscribe(self._sub, keepProgress)
return True
if __name__ == '__main__':
from .connection import TDengineConnection
conn = TDengineConnection(
host="127.0.0.1",
user="root",
password="taosdata",
database="test")
# Generate a cursor object to run SQL commands
sub = conn.subscribe(True, "test", "select * from meters;", 1000)
for i in range(0, 10):
data = sub.consume()
for d in data:
print(d)
sub.close()
conn.close()
Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
This program is free software: you can use, redistribute, and/or modify
it under the terms of the GNU Affero General Public License, version 3
or later ("AGPL"), as published by the Free Software Foundation.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
# TDengine python client interface
\ No newline at end of file
import setuptools
with open("README.md", "r") as fh:
long_description = fh.read()
setuptools.setup(
name="taos",
version="2.0.9",
author="Taosdata Inc.",
author_email="support@taosdata.com",
description="TDengine python client package",
long_description=long_description,
long_description_content_type="text/markdown",
url="https://github.com/pypa/sampleproject",
packages=setuptools.find_packages(),
classifiers=[
"Programming Language :: Python :: 3",
"Operating System :: MacOS X",
],
)
from .connection import TDengineConnection
from .cursor import TDengineCursor
# Globals
threadsafety = 0
paramstyle = 'pyformat'
__all__ = ['connection', 'cursor']
def connect(*args, **kwargs):
""" Function to return a TDengine connector object
Current supporting keyword parameters:
@dsn: Data source name as string
@user: Username as string(optional)
@password: Password as string(optional)
@host: Hostname(optional)
@database: Database name(optional)
@rtype: TDengineConnector
"""
return TDengineConnection(*args, **kwargs)
from .cursor import TDengineCursor
from .subscription import TDengineSubscription
from .cinterface import CTaosInterface
class TDengineConnection(object):
""" TDengine connection object
"""
def __init__(self, *args, **kwargs):
self._conn = None
self._host = None
self._user = "root"
self._password = "taosdata"
self._database = None
self._port = 0
self._config = None
self._chandle = None
self.config(**kwargs)
def config(self, **kwargs):
# host
if 'host' in kwargs:
self._host = kwargs['host']
# user
if 'user' in kwargs:
self._user = kwargs['user']
# password
if 'password' in kwargs:
self._password = kwargs['password']
# database
if 'database' in kwargs:
self._database = kwargs['database']
# port
if 'port' in kwargs:
self._port = kwargs['port']
# config
if 'config' in kwargs:
self._config = kwargs['config']
self._chandle = CTaosInterface(self._config)
self._conn = self._chandle.connect(
self._host,
self._user,
self._password,
self._database,
self._port)
def close(self):
"""Close current connection.
"""
return CTaosInterface.close(self._conn)
def subscribe(self, restart, topic, sql, interval):
"""Create a subscription.
"""
if self._conn is None:
return None
sub = CTaosInterface.subscribe(
self._conn, restart, topic, sql, interval)
return TDengineSubscription(sub)
def cursor(self):
"""Return a new Cursor object using the connection.
"""
return TDengineCursor(self)
def commit(self):
"""Commit any pending transaction to the database.
Since TDengine do not support transactions, the implement is void functionality.
"""
pass
def rollback(self):
"""Void functionality
"""
pass
def clear_result_set(self):
"""Clear unused result set on this connection.
"""
pass
if __name__ == "__main__":
conn = TDengineConnection(host='192.168.1.107')
conn.close()
print("Hello world")
"""Constants in TDengine python
"""
from .dbapi import *
class FieldType(object):
"""TDengine Field Types
"""
# type_code
C_NULL = 0
C_BOOL = 1
C_TINYINT = 2
C_SMALLINT = 3
C_INT = 4
C_BIGINT = 5
C_FLOAT = 6
C_DOUBLE = 7
C_BINARY = 8
C_TIMESTAMP = 9
C_NCHAR = 10
C_TINYINT_UNSIGNED = 11
C_SMALLINT_UNSIGNED = 12
C_INT_UNSIGNED = 13
C_BIGINT_UNSIGNED = 14
# NULL value definition
# NOTE: These values should change according to C definition in tsdb.h
C_BOOL_NULL = 0x02
C_TINYINT_NULL = -128
C_TINYINT_UNSIGNED_NULL = 255
C_SMALLINT_NULL = -32768
C_SMALLINT_UNSIGNED_NULL = 65535
C_INT_NULL = -2147483648
C_INT_UNSIGNED_NULL = 4294967295
C_BIGINT_NULL = -9223372036854775808
C_BIGINT_UNSIGNED_NULL = 18446744073709551615
C_FLOAT_NULL = float('nan')
C_DOUBLE_NULL = float('nan')
C_BINARY_NULL = bytearray([int('0xff', 16)])
# Timestamp precision definition
C_TIMESTAMP_MILLI = 0
C_TIMESTAMP_MICRO = 1
"""Type Objects and Constructors.
"""
import time
import datetime
class DBAPITypeObject(object):
def __init__(self, *values):
self.values = values
def __com__(self, other):
if other in self.values:
return 0
if other < self.values:
return 1
else:
return -1
Date = datetime.date
Time = datetime.time
Timestamp = datetime.datetime
def DataFromTicks(ticks):
return Date(*time.localtime(ticks)[:3])
def TimeFromTicks(ticks):
return Time(*time.localtime(ticks)[3:6])
def TimestampFromTicks(ticks):
return Timestamp(*time.localtime(ticks)[:6])
Binary = bytes
# STRING = DBAPITypeObject(*constants.FieldType.get_string_types())
# BINARY = DBAPITypeObject(*constants.FieldType.get_binary_types())
# NUMBER = BAPITypeObject(*constants.FieldType.get_number_types())
# DATETIME = DBAPITypeObject(*constants.FieldType.get_timestamp_types())
# ROWID = DBAPITypeObject()
"""Python exceptions
"""
class Error(Exception):
def __init__(self, msg=None, errno=None):
self.msg = msg
self._full_msg = self.msg
self.errno = errno
def __str__(self):
return self._full_msg
class Warning(Exception):
"""Exception raised for important warnings like data truncations while inserting.
"""
pass
class InterfaceError(Error):
"""Exception raised for errors that are related to the database interface rather than the database itself.
"""
pass
class DatabaseError(Error):
"""Exception raised for errors that are related to the database.
"""
pass
class DataError(DatabaseError):
"""Exception raised for errors that are due to problems with the processed data like division by zero, numeric value out of range.
"""
pass
class OperationalError(DatabaseError):
"""Exception raised for errors that are related to the database's operation and not necessarily under the control of the programmer
"""
pass
class IntegrityError(DatabaseError):
"""Exception raised when the relational integrity of the database is affected.
"""
pass
class InternalError(DatabaseError):
"""Exception raised when the database encounters an internal error.
"""
pass
class ProgrammingError(DatabaseError):
"""Exception raised for programming errors.
"""
pass
class NotSupportedError(DatabaseError):
"""Exception raised in case a method or database API was used which is not supported by the database,.
"""
pass
from .cinterface import CTaosInterface
from .error import *
class TDengineSubscription(object):
"""TDengine subscription object
"""
def __init__(self, sub):
self._sub = sub
def consume(self):
"""Consume rows of a subscription
"""
if self._sub is None:
raise OperationalError("Invalid use of consume")
result, fields = CTaosInterface.consume(self._sub)
buffer = [[] for i in range(len(fields))]
while True:
block, num_of_fields = CTaosInterface.fetchBlock(result, fields)
if num_of_fields == 0:
break
for i in range(len(fields)):
buffer[i].extend(block[i])
self.fields = fields
return list(map(tuple, zip(*buffer)))
def close(self, keepProgress=True):
"""Close the Subscription.
"""
if self._sub is None:
return False
CTaosInterface.unsubscribe(self._sub, keepProgress)
return True
if __name__ == '__main__':
from .connection import TDengineConnection
conn = TDengineConnection(
host="127.0.0.1",
user="root",
password="taosdata",
database="test")
# Generate a cursor object to run SQL commands
sub = conn.subscribe(True, "test", "select * from meters;", 1000)
for i in range(0, 10):
data = sub.consume()
for d in data:
print(d)
sub.close()
conn.close()
......@@ -5,16 +5,30 @@ with open("README.md", "r") as fh:
setuptools.setup(
name="taos",
version="2.0.9",
version="2.0.10",
author="Taosdata Inc.",
author_email="support@taosdata.com",
description="TDengine python client package",
long_description=long_description,
long_description_content_type="text/markdown",
url="https://github.com/pypa/sampleproject",
url="https://github.com/taosdata/TDengine/tree/develop/src/connector/python",
packages=setuptools.find_packages(),
classifiers=[
"Environment :: Console",
"Environment :: MacOS X",
"Environment :: Win32 (MS Windows)",
"Programming Language :: Python",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)",
"Operating System :: MacOS",
"Programming Language :: Python :: 2.7",
"Operating System :: Linux",
"Operating System :: POSIX :: Linux",
"Operating System :: Microsoft :: Windows",
"Operating System :: Microsoft :: Windows :: Windows 10",
],
)
......@@ -3,6 +3,7 @@ from .constants import FieldType
from .error import *
import math
import datetime
import platform
def _convert_millisecond_to_datetime(milli):
......@@ -20,46 +21,28 @@ def _crow_timestamp_to_python(data, num_of_rows, nbytes=None, micro=False):
if micro:
_timestamp_converter = _convert_microsecond_to_datetime
if num_of_rows > 0:
return [
None if ele == FieldType.C_BIGINT_NULL else _timestamp_converter(ele) for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_int64))[
:abs(num_of_rows)]]
else:
return [
None if ele == FieldType.C_BIGINT_NULL else _timestamp_converter(ele) for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_int64))[
:abs(num_of_rows)]]
return [
None if ele == FieldType.C_BIGINT_NULL else _timestamp_converter(ele) for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_int64))[
:abs(num_of_rows)]]
def _crow_bool_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C bool row to python row
"""
if num_of_rows > 0:
return [
None if ele == FieldType.C_BOOL_NULL else bool(ele) for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_byte))[
:abs(num_of_rows)]]
else:
return [
None if ele == FieldType.C_BOOL_NULL else bool(ele) for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_bool))[
:abs(num_of_rows)]]
return [
None if ele == FieldType.C_BOOL_NULL else bool(ele) for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_byte))[
:abs(num_of_rows)]]
def _crow_tinyint_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C tinyint row to python row
"""
if num_of_rows > 0:
return [None if ele == FieldType.C_TINYINT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)]]
else:
return [None if ele == FieldType.C_TINYINT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)]]
return [None if ele == FieldType.C_TINYINT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)]]
def _crow_tinyint_unsigned_to_python(
......@@ -69,92 +52,56 @@ def _crow_tinyint_unsigned_to_python(
micro=False):
"""Function to convert C tinyint row to python row
"""
if num_of_rows > 0:
return [
None if ele == FieldType.C_TINYINT_UNSIGNED_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_ubyte))[
:abs(num_of_rows)]]
else:
return [
None if ele == FieldType.C_TINYINT_UNSIGNED_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_ubyte))[
:abs(num_of_rows)]]
return [
None if ele == FieldType.C_TINYINT_UNSIGNED_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_ubyte))[
:abs(num_of_rows)]]
def _crow_smallint_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C smallint row to python row
"""
if num_of_rows > 0:
return [
None if ele == FieldType.C_SMALLINT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_short))[
:abs(num_of_rows)]]
else:
return [
None if ele == FieldType.C_SMALLINT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_short))[
:abs(num_of_rows)]]
return [
None if ele == FieldType.C_SMALLINT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_short))[
:abs(num_of_rows)]]
def _crow_smallint_unsigned_to_python(
data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C smallint row to python row
"""
if num_of_rows > 0:
return [
None if ele == FieldType.C_SMALLINT_UNSIGNED_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_ushort))[
:abs(num_of_rows)]]
else:
return [
None if ele == FieldType.C_SMALLINT_UNSIGNED_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_ushort))[
:abs(num_of_rows)]]
return [
None if ele == FieldType.C_SMALLINT_UNSIGNED_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_ushort))[
:abs(num_of_rows)]]
def _crow_int_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C int row to python row
"""
if num_of_rows > 0:
return [None if ele == FieldType.C_INT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_int))[:abs(num_of_rows)]]
else:
return [None if ele == FieldType.C_INT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_int))[:abs(num_of_rows)]]
return [None if ele == FieldType.C_INT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_int))[:abs(num_of_rows)]]
def _crow_int_unsigned_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C int row to python row
"""
if num_of_rows > 0:
return [
None if ele == FieldType.C_INT_UNSIGNED_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_uint))[
:abs(num_of_rows)]]
else:
return [
None if ele == FieldType.C_INT_UNSIGNED_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_uint))[
:abs(num_of_rows)]]
return [
None if ele == FieldType.C_INT_UNSIGNED_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_uint))[
:abs(num_of_rows)]]
def _crow_bigint_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C bigint row to python row
"""
if num_of_rows > 0:
return [None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_int64))[:abs(num_of_rows)]]
else:
return [None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_int64))[:abs(num_of_rows)]]
return [None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_int64))[:abs(num_of_rows)]]
def _crow_bigint_unsigned_to_python(
......@@ -164,52 +111,33 @@ def _crow_bigint_unsigned_to_python(
micro=False):
"""Function to convert C bigint row to python row
"""
if num_of_rows > 0:
return [
None if ele == FieldType.C_BIGINT_UNSIGNED_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_uint64))[
:abs(num_of_rows)]]
else:
return [
None if ele == FieldType.C_BIGINT_UNSIGNED_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_uint64))[
:abs(num_of_rows)]]
return [
None if ele == FieldType.C_BIGINT_UNSIGNED_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(
ctypes.c_uint64))[
:abs(num_of_rows)]]
def _crow_float_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C float row to python row
"""
if num_of_rows > 0:
return [None if math.isnan(ele) else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_float))[:abs(num_of_rows)]]
else:
return [None if math.isnan(ele) else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_float))[:abs(num_of_rows)]]
return [None if math.isnan(ele) else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_float))[:abs(num_of_rows)]]
def _crow_double_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C double row to python row
"""
if num_of_rows > 0:
return [None if math.isnan(ele) else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_double))[:abs(num_of_rows)]]
else:
return [None if math.isnan(ele) else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_double))[:abs(num_of_rows)]]
return [None if math.isnan(ele) else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_double))[:abs(num_of_rows)]]
def _crow_binary_to_python(data, num_of_rows, nbytes=None, micro=False):
"""Function to convert C binary row to python row
"""
assert(nbytes is not None)
if num_of_rows > 0:
return [None if ele.value[0:1] == FieldType.C_BINARY_NULL else ele.value.decode(
'utf-8') for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_char * nbytes)))[:abs(num_of_rows)]]
else:
return [None if ele.value[0:1] == FieldType.C_BINARY_NULL else ele.value.decode(
'utf-8') for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_char * nbytes)))[:abs(num_of_rows)]]
return [None if ele.value[0:1] == FieldType.C_BINARY_NULL else ele.value.decode(
'utf-8') for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_char * nbytes)))[:abs(num_of_rows)]]
def _crow_nchar_to_python(data, num_of_rows, nbytes=None, micro=False):
......@@ -236,30 +164,17 @@ def _crow_binary_to_python_block(data, num_of_rows, nbytes=None, micro=False):
"""
assert(nbytes is not None)
res = []
if num_of_rows > 0:
for i in range(abs(num_of_rows)):
try:
rbyte = ctypes.cast(
data + nbytes * i,
ctypes.POINTER(
ctypes.c_short))[
:1].pop()
tmpstr = ctypes.c_char_p(data + nbytes * i + 2)
res.append(tmpstr.value.decode()[0:rbyte])
except ValueError:
res.append(None)
else:
for i in range(abs(num_of_rows)):
try:
rbyte = ctypes.cast(
data + nbytes * i,
ctypes.POINTER(
ctypes.c_short))[
:1].pop()
tmpstr = ctypes.c_char_p(data + nbytes * i + 2)
res.append(tmpstr.value.decode()[0:rbyte])
except ValueError:
res.append(None)
for i in range(abs(num_of_rows)):
try:
rbyte = ctypes.cast(
data + nbytes * i,
ctypes.POINTER(
ctypes.c_short))[
:1].pop()
tmpstr = ctypes.c_char_p(data + nbytes * i + 2)
res.append(tmpstr.value.decode()[0:rbyte])
except ValueError:
res.append(None)
return res
......@@ -268,20 +183,12 @@ def _crow_nchar_to_python_block(data, num_of_rows, nbytes=None, micro=False):
"""
assert(nbytes is not None)
res = []
if num_of_rows >= 0:
for i in range(abs(num_of_rows)):
try:
tmpstr = ctypes.c_char_p(data + nbytes * i + 2)
res.append(tmpstr.value.decode())
except ValueError:
res.append(None)
else:
for i in range(abs(num_of_rows)):
try:
res.append((ctypes.cast(data + nbytes * i + 2,
ctypes.POINTER(ctypes.c_wchar * (nbytes // 4))))[0].value)
except ValueError:
res.append(None)
for i in range(abs(num_of_rows)):
try:
tmpstr = ctypes.c_char_p(data + nbytes * i + 2)
res.append(tmpstr.value.decode())
except ValueError:
res.append(None)
return res
......@@ -330,14 +237,38 @@ class TaosField(ctypes.Structure):
# C interface class
def _load_taos_linux():
return ctypes.CDLL('libtaos.so')
def _load_taos_darwin():
return ctypes.cDLL('libtaos.dylib')
def _load_taos_windows():
return ctypes.windll.LoadLibrary('taos')
def _load_taos():
load_func = {
'Linux': _load_taos_linux,
'Darwin': _load_taos_darwin,
'Windows': _load_taos_windows,
}
try:
return load_func[platform.system()]()
except:
sys.exit('unsupported platform to TDengine connector')
class CTaosInterface(object):
libtaos = ctypes.CDLL('libtaos.so')
libtaos = _load_taos()
libtaos.taos_fetch_fields.restype = ctypes.POINTER(TaosField)
libtaos.taos_init.restype = None
libtaos.taos_connect.restype = ctypes.c_void_p
#libtaos.taos_use_result.restype = ctypes.c_void_p
# libtaos.taos_use_result.restype = ctypes.c_void_p
libtaos.taos_fetch_row.restype = ctypes.POINTER(ctypes.c_void_p)
libtaos.taos_errstr.restype = ctypes.c_char_p
libtaos.taos_subscribe.restype = ctypes.c_void_p
......@@ -438,7 +369,7 @@ class CTaosInterface(object):
'''Close the TDengine handle
'''
CTaosInterface.libtaos.taos_close(connection)
#print('connection is closed')
# print('connection is closed')
@staticmethod
def query(connection, sql):
......
......@@ -45,6 +45,12 @@ class TDengineCursor(object):
return self
def __next__(self):
return self._taos_next()
def next(self):
return self._taos_next()
def _taos_next(self):
if self._result is None or self._fields is None:
raise OperationalError("Invalid use of fetch iterator")
......
Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
This program is free software: you can use, redistribute, and/or modify
it under the terms of the GNU Affero General Public License, version 3
or later ("AGPL"), as published by the Free Software Foundation.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
# TDengine python client interface
\ No newline at end of file
import setuptools
with open("README.md", "r") as fh:
long_description = fh.read()
setuptools.setup(
name="taos",
version="2.0.9",
author="Taosdata Inc.",
author_email="support@taosdata.com",
description="TDengine python client package",
long_description=long_description,
long_description_content_type="text/markdown",
url="https://github.com/pypa/sampleproject",
packages=setuptools.find_packages(),
classifiers=[
"Programming Language :: Python :: 2",
"Operating System :: Windows",
],
)
from .connection import TDengineConnection
from .cursor import TDengineCursor
# Globals
threadsafety = 0
paramstyle = 'pyformat'
__all__ = ['connection', 'cursor']
def connect(*args, **kwargs):
""" Function to return a TDengine connector object
Current supporting keyword parameters:
@dsn: Data source name as string
@user: Username as string(optional)
@password: Password as string(optional)
@host: Hostname(optional)
@database: Database name(optional)
@rtype: TDengineConnector
"""
return TDengineConnection(*args, **kwargs)
from .cursor import TDengineCursor
from .subscription import TDengineSubscription
from .cinterface import CTaosInterface
class TDengineConnection(object):
""" TDengine connection object
"""
def __init__(self, *args, **kwargs):
self._conn = None
self._host = None
self._user = "root"
self._password = "taosdata"
self._database = None
self._port = 0
self._config = None
self._chandle = None
if len(kwargs) > 0:
self.config(**kwargs)
def config(self, **kwargs):
# host
if 'host' in kwargs:
self._host = kwargs['host']
# user
if 'user' in kwargs:
self._user = kwargs['user']
# password
if 'password' in kwargs:
self._password = kwargs['password']
# database
if 'database' in kwargs:
self._database = kwargs['database']
# port
if 'port' in kwargs:
self._port = kwargs['port']
# config
if 'config' in kwargs:
self._config = kwargs['config']
self._chandle = CTaosInterface(self._config)
self._conn = self._chandle.connect(
self._host,
self._user,
self._password,
self._database,
self._port)
def close(self):
"""Close current connection.
"""
return CTaosInterface.close(self._conn)
def subscribe(self, restart, topic, sql, interval):
"""Create a subscription.
"""
if self._conn is None:
return None
sub = CTaosInterface.subscribe(
self._conn, restart, topic, sql, interval)
return TDengineSubscription(sub)
def cursor(self):
"""Return a new Cursor object using the connection.
"""
return TDengineCursor(self)
def commit(self):
"""Commit any pending transaction to the database.
Since TDengine do not support transactions, the implement is void functionality.
"""
pass
def rollback(self):
"""Void functionality
"""
pass
def clear_result_set(self):
"""Clear unused result set on this connection.
"""
pass
if __name__ == "__main__":
conn = TDengineConnection(host='192.168.1.107')
conn.close()
print("Hello world")
"""Constants in TDengine python
"""
from .dbapi import *
class FieldType(object):
"""TDengine Field Types
"""
# type_code
C_NULL = 0
C_BOOL = 1
C_TINYINT = 2
C_SMALLINT = 3
C_INT = 4
C_BIGINT = 5
C_FLOAT = 6
C_DOUBLE = 7
C_BINARY = 8
C_TIMESTAMP = 9
C_NCHAR = 10
C_TINYINT_UNSIGNED = 11
C_SMALLINT_UNSIGNED = 12
C_INT_UNSIGNED = 13
C_BIGINT_UNSIGNED = 14
# NULL value definition
# NOTE: These values should change according to C definition in tsdb.h
C_BOOL_NULL = 0x02
C_TINYINT_NULL = -128
C_TINYINT_UNSIGNED_NULL = 255
C_SMALLINT_NULL = -32768
C_SMALLINT_UNSIGNED_NULL = 65535
C_INT_NULL = -2147483648
C_INT_UNSIGNED_NULL = 4294967295
C_BIGINT_NULL = -9223372036854775808
C_BIGINT_UNSIGNED_NULL = 18446744073709551615
C_FLOAT_NULL = float('nan')
C_DOUBLE_NULL = float('nan')
C_BINARY_NULL = bytearray([int('0xff', 16)])
# Time precision definition
C_TIMESTAMP_MILLI = 0
C_TIMESTAMP_MICRO = 1
from .cinterface import CTaosInterface
from .error import *
from .constants import FieldType
# querySeqNum = 0
class TDengineCursor(object):
"""Database cursor which is used to manage the context of a fetch operation.
Attributes:
.description: Read-only attribute consists of 7-item sequences:
> name (mondatory)
> type_code (mondatory)
> display_size
> internal_size
> precision
> scale
> null_ok
This attribute will be None for operations that do not return rows or
if the cursor has not had an operation invoked via the .execute*() method yet.
.rowcount:This read-only attribute specifies the number of rows that the last
.execute*() produced (for DQL statements like SELECT) or affected
"""
def __init__(self, connection=None):
self._description = []
self._rowcount = -1
self._connection = None
self._result = None
self._fields = None
self._block = None
self._block_rows = -1
self._block_iter = 0
self._affected_rows = 0
self._logfile = ""
if connection is not None:
self._connection = connection
def __iter__(self):
return self
def __next__(self):
if self._result is None or self._fields is None:
raise OperationalError("Invalid use of fetch iterator")
if self._block_rows <= self._block_iter:
block, self._block_rows = CTaosInterface.fetchRow(
self._result, self._fields)
if self._block_rows == 0:
raise StopIteration
self._block = list(map(tuple, zip(*block)))
self._block_iter = 0
data = self._block[self._block_iter]
self._block_iter += 1
return data
@property
def description(self):
"""Return the description of the object.
"""
return self._description
@property
def rowcount(self):
"""Return the rowcount of the object
"""
return self._rowcount
@property
def affected_rows(self):
"""Return the affected_rows of the object
"""
return self._affected_rows
def callproc(self, procname, *args):
"""Call a stored database procedure with the given name.
Void functionality since no stored procedures.
"""
pass
def close(self):
"""Close the cursor.
"""
if self._connection is None:
return False
self._reset_result()
self._connection = None
return True
def execute(self, operation, params=None):
"""Prepare and execute a database operation (query or command).
"""
if not operation:
return None
if not self._connection:
# TODO : change the exception raised here
raise ProgrammingError("Cursor is not connected")
self._reset_result()
stmt = operation
if params is not None:
pass
self._result = CTaosInterface.query(self._connection._conn, stmt)
errno = CTaosInterface.libtaos.taos_errno(self._result)
if errno == 0:
if CTaosInterface.fieldsCount(self._result) == 0:
self._affected_rows += CTaosInterface.affectedRows(
self._result)
return CTaosInterface.affectedRows(self._result)
else:
self._fields = CTaosInterface.useResult(self._result)
return self._handle_result()
else:
raise ProgrammingError(CTaosInterface.errStr(self._result), errno)
def executemany(self, operation, seq_of_parameters):
"""Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters.
"""
pass
def fetchone(self):
"""Fetch the next row of a query result set, returning a single sequence, or None when no more data is available.
"""
pass
def fetchmany(self):
pass
def fetchall_row(self):
"""Fetch all (remaining) rows of a query result, returning them as a sequence of sequences (e.g. a list of tuples). Note that the cursor's arraysize attribute can affect the performance of this operation.
"""
if self._result is None or self._fields is None:
raise OperationalError("Invalid use of fetchall")
buffer = [[] for i in range(len(self._fields))]
self._rowcount = 0
while True:
block, num_of_fields = CTaosInterface.fetchRow(
self._result, self._fields)
errno = CTaosInterface.libtaos.taos_errno(self._result)
if errno != 0:
raise ProgrammingError(
CTaosInterface.errStr(
self._result), errno)
if num_of_fields == 0:
break
self._rowcount += num_of_fields
for i in range(len(self._fields)):
buffer[i].extend(block[i])
return list(map(tuple, zip(*buffer)))
def fetchall(self):
if self._result is None or self._fields is None:
raise OperationalError("Invalid use of fetchall")
buffer = [[] for i in range(len(self._fields))]
self._rowcount = 0
while True:
block, num_of_fields = CTaosInterface.fetchBlock(
self._result, self._fields)
errno = CTaosInterface.libtaos.taos_errno(self._result)
if errno != 0:
raise ProgrammingError(
CTaosInterface.errStr(
self._result), errno)
if num_of_fields == 0:
break
self._rowcount += num_of_fields
for i in range(len(self._fields)):
buffer[i].extend(block[i])
return list(map(tuple, zip(*buffer)))
def nextset(self):
"""
"""
pass
def setinputsize(self, sizes):
pass
def setutputsize(self, size, column=None):
pass
def _reset_result(self):
"""Reset the result to unused version.
"""
self._description = []
self._rowcount = -1
if self._result is not None:
CTaosInterface.freeResult(self._result)
self._result = None
self._fields = None
self._block = None
self._block_rows = -1
self._block_iter = 0
self._affected_rows = 0
def _handle_result(self):
"""Handle the return result from query.
"""
self._description = []
for ele in self._fields:
self._description.append(
(ele['name'], ele['type'], None, None, None, None, False))
return self._result
"""Type Objects and Constructors.
"""
import time
import datetime
class DBAPITypeObject(object):
def __init__(self, *values):
self.values = values
def __com__(self, other):
if other in self.values:
return 0
if other < self.values:
return 1
else:
return -1
Date = datetime.date
Time = datetime.time
Timestamp = datetime.datetime
def DataFromTicks(ticks):
return Date(*time.localtime(ticks)[:3])
def TimeFromTicks(ticks):
return Time(*time.localtime(ticks)[3:6])
def TimestampFromTicks(ticks):
return Timestamp(*time.localtime(ticks)[:6])
Binary = bytes
# STRING = DBAPITypeObject(*constants.FieldType.get_string_types())
# BINARY = DBAPITypeObject(*constants.FieldType.get_binary_types())
# NUMBER = BAPITypeObject(*constants.FieldType.get_number_types())
# DATETIME = DBAPITypeObject(*constants.FieldType.get_timestamp_types())
# ROWID = DBAPITypeObject()
"""Python exceptions
"""
class Error(Exception):
def __init__(self, msg=None, errno=None):
self.msg = msg
self._full_msg = self.msg
self.errno = errno
def __str__(self):
return self._full_msg
class Warning(Exception):
"""Exception raised for important warnings like data truncations while inserting.
"""
pass
class InterfaceError(Error):
"""Exception raised for errors that are related to the database interface rather than the database itself.
"""
pass
class DatabaseError(Error):
"""Exception raised for errors that are related to the database.
"""
pass
class DataError(DatabaseError):
"""Exception raised for errors that are due to problems with the processed data like division by zero, numeric value out of range.
"""
pass
class OperationalError(DatabaseError):
"""Exception raised for errors that are related to the database's operation and not necessarily under the control of the programmer
"""
pass
class IntegrityError(DatabaseError):
"""Exception raised when the relational integrity of the database is affected.
"""
pass
class InternalError(DatabaseError):
"""Exception raised when the database encounters an internal error.
"""
pass
class ProgrammingError(DatabaseError):
"""Exception raised for programming errors.
"""
pass
class NotSupportedError(DatabaseError):
"""Exception raised in case a method or database API was used which is not supported by the database,.
"""
pass
from .cinterface import CTaosInterface
from .error import *
class TDengineSubscription(object):
"""TDengine subscription object
"""
def __init__(self, sub):
self._sub = sub
def consume(self):
"""Consume rows of a subscription
"""
if self._sub is None:
raise OperationalError("Invalid use of consume")
result, fields = CTaosInterface.consume(self._sub)
buffer = [[] for i in range(len(fields))]
while True:
block, num_of_fields = CTaosInterface.fetchBlock(result, fields)
if num_of_fields == 0:
break
for i in range(len(fields)):
buffer[i].extend(block[i])
self.fields = fields
return list(map(tuple, zip(*buffer)))
def close(self, keepProgress=True):
"""Close the Subscription.
"""
if self._sub is None:
return False
CTaosInterface.unsubscribe(self._sub, keepProgress)
return True
if __name__ == '__main__':
from .connection import TDengineConnection
conn = TDengineConnection(
host="127.0.0.1",
user="root",
password="taosdata",
database="test")
# Generate a cursor object to run SQL commands
sub = conn.subscribe(True, "test", "select * from meters;", 1000)
for i in range(0, 10):
data = sub.consume()
for d in data:
print(d)
sub.close()
conn.close()
Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
This program is free software: you can use, redistribute, and/or modify
it under the terms of the GNU Affero General Public License, version 3
or later ("AGPL"), as published by the Free Software Foundation.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
# TDengine python client interface
\ No newline at end of file
import setuptools
with open("README.md", "r") as fh:
long_description = fh.read()
setuptools.setup(
name="taos",
version="2.0.9",
author="Taosdata Inc.",
author_email="support@taosdata.com",
description="TDengine python client package",
long_description=long_description,
long_description_content_type="text/markdown",
url="https://github.com/pypa/sampleproject",
packages=setuptools.find_packages(),
classifiers=[
"Programming Language :: Python :: 3",
"Operating System :: Windows",
],
)
from .connection import TDengineConnection
from .cursor import TDengineCursor
# Globals
threadsafety = 0
paramstyle = 'pyformat'
__all__ = ['connection', 'cursor']
def connect(*args, **kwargs):
""" Function to return a TDengine connector object
Current supporting keyword parameters:
@dsn: Data source name as string
@user: Username as string(optional)
@password: Password as string(optional)
@host: Hostname(optional)
@database: Database name(optional)
@rtype: TDengineConnector
"""
return TDengineConnection(*args, **kwargs)
from .cursor import TDengineCursor
from .subscription import TDengineSubscription
from .cinterface import CTaosInterface
class TDengineConnection(object):
""" TDengine connection object
"""
def __init__(self, *args, **kwargs):
self._conn = None
self._host = None
self._user = "root"
self._password = "taosdata"
self._database = None
self._port = 0
self._config = None
self._chandle = None
if len(kwargs) > 0:
self.config(**kwargs)
def config(self, **kwargs):
# host
if 'host' in kwargs:
self._host = kwargs['host']
# user
if 'user' in kwargs:
self._user = kwargs['user']
# password
if 'password' in kwargs:
self._password = kwargs['password']
# database
if 'database' in kwargs:
self._database = kwargs['database']
# port
if 'port' in kwargs:
self._port = kwargs['port']
# config
if 'config' in kwargs:
self._config = kwargs['config']
self._chandle = CTaosInterface(self._config)
self._conn = self._chandle.connect(
self._host,
self._user,
self._password,
self._database,
self._port)
def close(self):
"""Close current connection.
"""
return CTaosInterface.close(self._conn)
def subscribe(self, restart, topic, sql, interval):
"""Create a subscription.
"""
if self._conn is None:
return None
sub = CTaosInterface.subscribe(
self._conn, restart, topic, sql, interval)
return TDengineSubscription(sub)
def cursor(self):
"""Return a new Cursor object using the connection.
"""
return TDengineCursor(self)
def commit(self):
"""Commit any pending transaction to the database.
Since TDengine do not support transactions, the implement is void functionality.
"""
pass
def rollback(self):
"""Void functionality
"""
pass
def clear_result_set(self):
"""Clear unused result set on this connection.
"""
pass
if __name__ == "__main__":
conn = TDengineConnection(host='192.168.1.107')
conn.close()
print("Hello world")
"""Constants in TDengine python
"""
from .dbapi import *
class FieldType(object):
"""TDengine Field Types
"""
# type_code
C_NULL = 0
C_BOOL = 1
C_TINYINT = 2
C_SMALLINT = 3
C_INT = 4
C_BIGINT = 5
C_FLOAT = 6
C_DOUBLE = 7
C_BINARY = 8
C_TIMESTAMP = 9
C_NCHAR = 10
C_TINYINT_UNSIGNED = 11
C_SMALLINT_UNSIGNED = 12
C_INT_UNSIGNED = 13
C_BIGINT_UNSIGNED = 14
# NULL value definition
# NOTE: These values should change according to C definition in tsdb.h
C_BOOL_NULL = 0x02
C_TINYINT_NULL = -128
C_TINYINT_UNSIGNED_NULL = 255
C_SMALLINT_NULL = -32768
C_SMALLINT_UNSIGNED_NULL = 65535
C_INT_NULL = -2147483648
C_INT_UNSIGNED_NULL = 4294967295
C_BIGINT_NULL = -9223372036854775808
C_BIGINT_UNSIGNED_NULL = 18446744073709551615
C_FLOAT_NULL = float('nan')
C_DOUBLE_NULL = float('nan')
C_BINARY_NULL = bytearray([int('0xff', 16)])
# Timestamp precision definition
C_TIMESTAMP_MILLI = 0
C_TIMESTAMP_MICRO = 1
from .cinterface import CTaosInterface
from .error import *
from .constants import FieldType
# querySeqNum = 0
class TDengineCursor(object):
"""Database cursor which is used to manage the context of a fetch operation.
Attributes:
.description: Read-only attribute consists of 7-item sequences:
> name (mondatory)
> type_code (mondatory)
> display_size
> internal_size
> precision
> scale
> null_ok
This attribute will be None for operations that do not return rows or
if the cursor has not had an operation invoked via the .execute*() method yet.
.rowcount:This read-only attribute specifies the number of rows that the last
.execute*() produced (for DQL statements like SELECT) or affected
"""
def __init__(self, connection=None):
self._description = []
self._rowcount = -1
self._connection = None
self._result = None
self._fields = None
self._block = None
self._block_rows = -1
self._block_iter = 0
self._affected_rows = 0
self._logfile = ""
if connection is not None:
self._connection = connection
def __iter__(self):
return self
def __next__(self):
if self._result is None or self._fields is None:
raise OperationalError("Invalid use of fetch iterator")
if self._block_rows <= self._block_iter:
block, self._block_rows = CTaosInterface.fetchRow(
self._result, self._fields)
if self._block_rows == 0:
raise StopIteration
self._block = list(map(tuple, zip(*block)))
self._block_iter = 0
data = self._block[self._block_iter]
self._block_iter += 1
return data
@property
def description(self):
"""Return the description of the object.
"""
return self._description
@property
def rowcount(self):
"""Return the rowcount of the object
"""
return self._rowcount
@property
def affected_rows(self):
"""Return the affected_rows of the object
"""
return self._affected_rows
def callproc(self, procname, *args):
"""Call a stored database procedure with the given name.
Void functionality since no stored procedures.
"""
pass
def close(self):
"""Close the cursor.
"""
if self._connection is None:
return False
self._reset_result()
self._connection = None
return True
def execute(self, operation, params=None):
"""Prepare and execute a database operation (query or command).
"""
if not operation:
return None
if not self._connection:
# TODO : change the exception raised here
raise ProgrammingError("Cursor is not connected")
self._reset_result()
stmt = operation
if params is not None:
pass
self._result = CTaosInterface.query(self._connection._conn, stmt)
errno = CTaosInterface.libtaos.taos_errno(self._result)
if errno == 0:
if CTaosInterface.fieldsCount(self._result) == 0:
self._affected_rows += CTaosInterface.affectedRows(
self._result)
return CTaosInterface.affectedRows(self._result)
else:
self._fields = CTaosInterface.useResult(self._result)
return self._handle_result()
else:
raise ProgrammingError(CTaosInterface.errStr(self._result), errno)
def executemany(self, operation, seq_of_parameters):
"""Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters.
"""
pass
def fetchone(self):
"""Fetch the next row of a query result set, returning a single sequence, or None when no more data is available.
"""
pass
def fetchmany(self):
pass
def fetchall_row(self):
"""Fetch all (remaining) rows of a query result, returning them as a sequence of sequences (e.g. a list of tuples). Note that the cursor's arraysize attribute can affect the performance of this operation.
"""
if self._result is None or self._fields is None:
raise OperationalError("Invalid use of fetchall")
buffer = [[] for i in range(len(self._fields))]
self._rowcount = 0
while True:
block, num_of_fields = CTaosInterface.fetchRow(
self._result, self._fields)
errno = CTaosInterface.libtaos.taos_errno(self._result)
if errno != 0:
raise ProgrammingError(
CTaosInterface.errStr(
self._result), errno)
if num_of_fields == 0:
break
self._rowcount += num_of_fields
for i in range(len(self._fields)):
buffer[i].extend(block[i])
return list(map(tuple, zip(*buffer)))
def fetchall(self):
if self._result is None or self._fields is None:
raise OperationalError("Invalid use of fetchall")
buffer = [[] for i in range(len(self._fields))]
self._rowcount = 0
while True:
block, num_of_fields = CTaosInterface.fetchBlock(
self._result, self._fields)
errno = CTaosInterface.libtaos.taos_errno(self._result)
if errno != 0:
raise ProgrammingError(
CTaosInterface.errStr(
self._result), errno)
if num_of_fields == 0:
break
self._rowcount += num_of_fields
for i in range(len(self._fields)):
buffer[i].extend(block[i])
return list(map(tuple, zip(*buffer)))
def nextset(self):
"""
"""
pass
def setinputsize(self, sizes):
pass
def setutputsize(self, size, column=None):
pass
def _reset_result(self):
"""Reset the result to unused version.
"""
self._description = []
self._rowcount = -1
if self._result is not None:
CTaosInterface.freeResult(self._result)
self._result = None
self._fields = None
self._block = None
self._block_rows = -1
self._block_iter = 0
self._affected_rows = 0
def _handle_result(self):
"""Handle the return result from query.
"""
self._description = []
for ele in self._fields:
self._description.append(
(ele['name'], ele['type'], None, None, None, None, False))
return self._result
"""Type Objects and Constructors.
"""
import time
import datetime
class DBAPITypeObject(object):
def __init__(self, *values):
self.values = values
def __com__(self, other):
if other in self.values:
return 0
if other < self.values:
return 1
else:
return -1
Date = datetime.date
Time = datetime.time
Timestamp = datetime.datetime
def DataFromTicks(ticks):
return Date(*time.localtime(ticks)[:3])
def TimeFromTicks(ticks):
return Time(*time.localtime(ticks)[3:6])
def TimestampFromTicks(ticks):
return Timestamp(*time.localtime(ticks)[:6])
Binary = bytes
# STRING = DBAPITypeObject(*constants.FieldType.get_string_types())
# BINARY = DBAPITypeObject(*constants.FieldType.get_binary_types())
# NUMBER = BAPITypeObject(*constants.FieldType.get_number_types())
# DATETIME = DBAPITypeObject(*constants.FieldType.get_timestamp_types())
# ROWID = DBAPITypeObject()
......@@ -6261,7 +6261,7 @@ static void *specifiedTableQuery(void *sarg) {
uint64_t lastPrintTime = taosGetTimestampMs();
uint64_t startTs = taosGetTimestampMs();
if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq] != NULL) {
if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0] != '\0') {
sprintf(pThreadInfo->filePath, "%s-%d",
g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq],
pThreadInfo->threadID);
......@@ -6362,7 +6362,7 @@ static void *superTableQuery(void *sarg) {
for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) {
memset(sqlstr,0,sizeof(sqlstr));
replaceChildTblName(g_queryInfo.superQueryInfo.sql[j], sqlstr, i);
if (g_queryInfo.superQueryInfo.result[j] != NULL) {
if (g_queryInfo.superQueryInfo.result[j][0] != '\0') {
sprintf(pThreadInfo->filePath, "%s-%d",
g_queryInfo.superQueryInfo.result[j],
pThreadInfo->threadID);
......@@ -6796,7 +6796,7 @@ static void *specifiedSubscribe(void *sarg) {
"taosdemo-subscribe-%"PRIu64"-%d",
pThreadInfo->querySeq,
pThreadInfo->threadID);
if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq] != NULL) {
if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0] != '\0') {
sprintf(pThreadInfo->filePath, "%s-%d",
g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq],
pThreadInfo->threadID);
......
......@@ -8,8 +8,8 @@
3. mkdir debug; cd debug; cmake ..; make ; sudo make install
4. pip install ../src/connector/python/linux/python2 ; pip3 install
../src/connector/python/linux/python3
4. pip install ../src/connector/python ; pip3 install
../src/connector/python
5. pip install numpy; pip3 install numpy (numpy is required only if you need to run querySort.py)
......
......@@ -21,7 +21,7 @@ def pre_test(){
cmake .. > /dev/null
make > /dev/null
make install > /dev/null
pip3 install ${WKC}/src/connector/python/linux/python3/
pip3 install ${WKC}/src/connector/python
'''
return 1
}
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册