提交 a614653b 编写于 作者: S shenglian zhou

Merge branch 'master' into hotfix/td-6396

...@@ -19,6 +19,7 @@ else ...@@ -19,6 +19,7 @@ else
fi fi
# Dynamic directory # Dynamic directory
data_dir="/var/lib/taos" data_dir="/var/lib/taos"
if [ "$osType" != "Darwin" ]; then if [ "$osType" != "Darwin" ]; then
...@@ -29,25 +30,32 @@ fi ...@@ -29,25 +30,32 @@ fi
data_link_dir="/usr/local/taos/data" data_link_dir="/usr/local/taos/data"
log_link_dir="/usr/local/taos/log" log_link_dir="/usr/local/taos/log"
if [ "$osType" != "Darwin" ]; then
cfg_install_dir="/etc/taos" cfg_install_dir="/etc/taos"
else
cfg_install_dir="/usr/local/Cellar/tdengine/${verNumber}/taos"
fi
if [ "$osType" != "Darwin" ]; then if [ "$osType" != "Darwin" ]; then
bin_link_dir="/usr/bin" bin_link_dir="/usr/bin"
lib_link_dir="/usr/lib" lib_link_dir="/usr/lib"
lib64_link_dir="/usr/lib64" lib64_link_dir="/usr/lib64"
inc_link_dir="/usr/include" inc_link_dir="/usr/include"
else
bin_link_dir="/usr/local/bin"
lib_link_dir="/usr/local/lib"
inc_link_dir="/usr/local/include"
fi fi
#install main path #install main path
install_main_dir="/usr/local/taos" if [ "$osType" != "Darwin" ]; then
install_main_dir="/usr/local/taos"
else
install_main_dir="/usr/local/Cellar/tdengine/${verNumber}"
fi
# old bin dir # old bin dir
bin_dir="/usr/local/taos/bin" if [ "$osType" != "Darwin" ]; then
bin_dir="/usr/local/taos/bin"
else
bin_dir="/usr/local/Cellar/tdengine/${verNumber}/bin"
fi
service_config_dir="/etc/systemd/system" service_config_dir="/etc/systemd/system"
...@@ -59,12 +67,11 @@ GREEN_UNDERLINE='\033[4;32m' ...@@ -59,12 +67,11 @@ GREEN_UNDERLINE='\033[4;32m'
NC='\033[0m' NC='\033[0m'
csudo="" csudo=""
if command -v sudo > /dev/null; then
csudo="sudo"
fi
if [ "$osType" != "Darwin" ]; then if [ "$osType" != "Darwin" ]; then
if command -v sudo > /dev/null; then
csudo="sudo"
fi
initd_mod=0 initd_mod=0
service_mod=2 service_mod=2
if pidof systemd &> /dev/null; then if pidof systemd &> /dev/null; then
...@@ -137,18 +144,16 @@ function install_main_path() { ...@@ -137,18 +144,16 @@ function install_main_path() {
function install_bin() { function install_bin() {
# Remove links # Remove links
${csudo} rm -f ${bin_link_dir}/taos || :
if [ "$osType" != "Darwin" ]; then if [ "$osType" != "Darwin" ]; then
${csudo} rm -f ${bin_link_dir}/taos || :
${csudo} rm -f ${bin_link_dir}/taosd || : ${csudo} rm -f ${bin_link_dir}/taosd || :
${csudo} rm -f ${bin_link_dir}/taosdemo || : ${csudo} rm -f ${bin_link_dir}/taosdemo || :
${csudo} rm -f ${bin_link_dir}/perfMonitor || : ${csudo} rm -f ${bin_link_dir}/perfMonitor || :
${csudo} rm -f ${bin_link_dir}/taosdump || : ${csudo} rm -f ${bin_link_dir}/taosdump || :
${csudo} rm -f ${bin_link_dir}/set_core || : ${csudo} rm -f ${bin_link_dir}/set_core || :
${csudo} rm -f ${bin_link_dir}/rmtaos || :
fi fi
${csudo} rm -f ${bin_link_dir}/rmtaos || :
${csudo} cp -r ${binary_dir}/build/bin/* ${install_main_dir}/bin ${csudo} cp -r ${binary_dir}/build/bin/* ${install_main_dir}/bin
${csudo} cp -r ${script_dir}/taosd-dump-cfg.gdb ${install_main_dir}/bin ${csudo} cp -r ${script_dir}/taosd-dump-cfg.gdb ${install_main_dir}/bin
...@@ -162,20 +167,17 @@ function install_bin() { ...@@ -162,20 +167,17 @@ function install_bin() {
${csudo} chmod 0555 ${install_main_dir}/bin/* ${csudo} chmod 0555 ${install_main_dir}/bin/*
#Make link #Make link
[ -x ${install_main_dir}/bin/taos ] && ${csudo} ln -s ${install_main_dir}/bin/taos ${bin_link_dir}/taos || :
if [ "$osType" != "Darwin" ]; then if [ "$osType" != "Darwin" ]; then
[ -x ${install_main_dir}/bin/taos ] && ${csudo} ln -s ${install_main_dir}/bin/taos ${bin_link_dir}/taos || :
[ -x ${install_main_dir}/bin/taosd ] && ${csudo} ln -s ${install_main_dir}/bin/taosd ${bin_link_dir}/taosd || : [ -x ${install_main_dir}/bin/taosd ] && ${csudo} ln -s ${install_main_dir}/bin/taosd ${bin_link_dir}/taosd || :
[ -x ${install_main_dir}/bin/taosdump ] && ${csudo} ln -s ${install_main_dir}/bin/taosdump ${bin_link_dir}/taosdump || : [ -x ${install_main_dir}/bin/taosdump ] && ${csudo} ln -s ${install_main_dir}/bin/taosdump ${bin_link_dir}/taosdump || :
[ -x ${install_main_dir}/bin/taosdemo ] && ${csudo} ln -s ${install_main_dir}/bin/taosdemo ${bin_link_dir}/taosdemo || : [ -x ${install_main_dir}/bin/taosdemo ] && ${csudo} ln -s ${install_main_dir}/bin/taosdemo ${bin_link_dir}/taosdemo || :
[ -x ${install_main_dir}/bin/perfMonitor ] && ${csudo} ln -s ${install_main_dir}/bin/perfMonitor ${bin_link_dir}/perfMonitor || : [ -x ${install_main_dir}/bin/perfMonitor ] && ${csudo} ln -s ${install_main_dir}/bin/perfMonitor ${bin_link_dir}/perfMonitor || :
[ -x ${install_main_dir}/set_core.sh ] && ${csudo} ln -s ${install_main_dir}/bin/set_core.sh ${bin_link_dir}/set_core || : [ -x ${install_main_dir}/set_core.sh ] && ${csudo} ln -s ${install_main_dir}/bin/set_core.sh ${bin_link_dir}/set_core || :
fi fi
if [ "$osType" != "Darwin" ]; then if [ "$osType" != "Darwin" ]; then
[ -x ${install_main_dir}/bin/remove.sh ] && ${csudo} ln -s ${install_main_dir}/bin/remove.sh ${bin_link_dir}/rmtaos || : [ -x ${install_main_dir}/bin/remove.sh ] && ${csudo} ln -s ${install_main_dir}/bin/remove.sh ${bin_link_dir}/rmtaos || :
else
[ -x ${install_main_dir}/bin/remove_client.sh ] && ${csudo} ln -s ${install_main_dir}/bin/remove_client.sh ${bin_link_dir}/rmtaos || :
fi fi
} }
...@@ -222,7 +224,7 @@ function install_jemalloc() { ...@@ -222,7 +224,7 @@ function install_jemalloc() {
fi fi
if [ -d /etc/ld.so.conf.d ]; then if [ -d /etc/ld.so.conf.d ]; then
${csudo} echo "/usr/local/lib" > /etc/ld.so.conf.d/jemalloc.conf echo "/usr/local/lib" | ${csudo} tee /etc/ld.so.conf.d/jemalloc.conf
${csudo} ldconfig ${csudo} ldconfig
else else
echo "/etc/ld.so.conf.d not found!" echo "/etc/ld.so.conf.d not found!"
...@@ -248,10 +250,8 @@ function install_lib() { ...@@ -248,10 +250,8 @@ function install_lib() {
fi fi
else else
${csudo} cp -Rf ${binary_dir}/build/lib/libtaos.* ${install_main_dir}/driver && ${csudo} chmod 777 ${install_main_dir}/driver/* ${csudo} cp -Rf ${binary_dir}/build/lib/libtaos.* ${install_main_dir}/driver && ${csudo} chmod 777 ${install_main_dir}/driver/*
${csudo} ln -sf ${install_main_dir}/driver/libtaos.1.dylib ${lib_link_dir}/libtaos.1.dylib
${csudo} ln -sf ${lib_link_dir}/libtaos.1.dylib ${lib_link_dir}/libtaos.dylib
fi fi
install_jemalloc install_jemalloc
if [ "$osType" != "Darwin" ]; then if [ "$osType" != "Darwin" ]; then
...@@ -261,10 +261,14 @@ function install_lib() { ...@@ -261,10 +261,14 @@ function install_lib() {
function install_header() { function install_header() {
${csudo} rm -f ${inc_link_dir}/taos.h ${inc_link_dir}/taoserror.h || : if [ "$osType" != "Darwin" ]; then
${csudo} rm -f ${inc_link_dir}/taos.h ${inc_link_dir}/taoserror.h || :
fi
${csudo} cp -f ${source_dir}/src/inc/taos.h ${source_dir}/src/inc/taoserror.h ${install_main_dir}/include && ${csudo} chmod 644 ${install_main_dir}/include/* ${csudo} cp -f ${source_dir}/src/inc/taos.h ${source_dir}/src/inc/taoserror.h ${install_main_dir}/include && ${csudo} chmod 644 ${install_main_dir}/include/*
${csudo} ln -s ${install_main_dir}/include/taos.h ${inc_link_dir}/taos.h if [ "$osType" != "Darwin" ]; then
${csudo} ln -s ${install_main_dir}/include/taoserror.h ${inc_link_dir}/taoserror.h ${csudo} ln -s ${install_main_dir}/include/taos.h ${inc_link_dir}/taos.h
${csudo} ln -s ${install_main_dir}/include/taoserror.h ${inc_link_dir}/taoserror.h
fi
} }
function install_config() { function install_config() {
...@@ -272,29 +276,30 @@ function install_config() { ...@@ -272,29 +276,30 @@ function install_config() {
if [ ! -f ${cfg_install_dir}/taos.cfg ]; then if [ ! -f ${cfg_install_dir}/taos.cfg ]; then
${csudo} mkdir -p ${cfg_install_dir} ${csudo} mkdir -p ${cfg_install_dir}
[ -f ${script_dir}/../cfg/taos.cfg ] && ${csudo} cp ${script_dir}/../cfg/taos.cfg ${cfg_install_dir} [ -f ${script_dir}/../cfg/taos.cfg ] &&
${csudo} cp ${script_dir}/../cfg/taos.cfg ${cfg_install_dir}
${csudo} chmod 644 ${cfg_install_dir}/* ${csudo} chmod 644 ${cfg_install_dir}/*
fi fi
${csudo} cp -f ${script_dir}/../cfg/taos.cfg ${install_main_dir}/cfg/taos.cfg.org ${csudo} cp -f ${script_dir}/../cfg/taos.cfg ${install_main_dir}/cfg/taos.cfg.org
${csudo} ln -s ${cfg_install_dir}/taos.cfg ${install_main_dir}/cfg
if [ "$osType" != "Darwin" ]; then ${csudo} ln -s ${cfg_install_dir}/taos.cfg ${install_main_dir}/cfg
fi
} }
function install_log() { function install_log() {
${csudo} rm -rf ${log_dir} || :
if [ "$osType" != "Darwin" ]; then if [ "$osType" != "Darwin" ]; then
${csudo} mkdir -p ${log_dir} && ${csudo} chmod 777 ${log_dir} ${csudo} rm -rf ${log_dir} || :
else ${csudo} mkdir -p ${log_dir} && ${csudo} chmod 777 ${log_dir}
mkdir -p ${log_dir} && chmod 777 ${log_dir} ${csudo} ln -s ${log_dir} ${install_main_dir}/log
fi fi
${csudo} ln -s ${log_dir} ${install_main_dir}/log
} }
function install_data() { function install_data() {
${csudo} mkdir -p ${data_dir} if [ "$osType" != "Darwin" ]; then
${csudo} ln -s ${data_dir} ${install_main_dir}/data ${csudo} mkdir -p ${data_dir}
${csudo} ln -s ${data_dir} ${install_main_dir}/data
fi
} }
function install_connector() { function install_connector() {
...@@ -309,7 +314,6 @@ function install_connector() { ...@@ -309,7 +314,6 @@ function install_connector() {
echo "WARNING: go connector not found, please check if want to use it!" echo "WARNING: go connector not found, please check if want to use it!"
fi fi
${csudo} cp -rf ${source_dir}/src/connector/python ${install_main_dir}/connector ${csudo} cp -rf ${source_dir}/src/connector/python ${install_main_dir}/connector
${csudo} cp ${binary_dir}/build/lib/*.jar ${install_main_dir}/connector &> /dev/null && ${csudo} chmod 777 ${install_main_dir}/connector/*.jar || echo &> /dev/null ${csudo} cp ${binary_dir}/build/lib/*.jar ${install_main_dir}/connector &> /dev/null && ${csudo} chmod 777 ${install_main_dir}/connector/*.jar || echo &> /dev/null
} }
...@@ -489,24 +493,24 @@ function install_TDengine() { ...@@ -489,24 +493,24 @@ function install_TDengine() {
else else
echo -e "${GREEN}Start to install TDEngine Client ...${NC}" echo -e "${GREEN}Start to install TDEngine Client ...${NC}"
fi fi
install_main_path install_main_path
if [ "$osType" != "Darwin" ]; then if [ "$osType" != "Darwin" ]; then
install_data install_data
fi fi
install_log install_log
install_header install_header
install_lib install_lib
install_connector install_connector
install_examples install_examples
install_bin install_bin
if [ "$osType" != "Darwin" ]; then if [ "$osType" != "Darwin" ]; then
install_service install_service
fi fi
install_config install_config
if [ "$osType" != "Darwin" ]; then if [ "$osType" != "Darwin" ]; then
......
...@@ -307,6 +307,7 @@ typedef struct { ...@@ -307,6 +307,7 @@ typedef struct {
char * data; char * data;
TAOS_ROW tsrow; TAOS_ROW tsrow;
TAOS_ROW urow; TAOS_ROW urow;
bool dataConverted;
int32_t* length; // length for each field for current row int32_t* length; // length for each field for current row
char ** buffer; // Buffer used to put multibytes encoded using unicode (wchar_t) char ** buffer; // Buffer used to put multibytes encoded using unicode (wchar_t)
SColumnIndex* pColumnIndex; SColumnIndex* pColumnIndex;
...@@ -439,7 +440,7 @@ int32_t tscTansformFuncForSTableQuery(SQueryInfo *pQueryInfo); ...@@ -439,7 +440,7 @@ int32_t tscTansformFuncForSTableQuery(SQueryInfo *pQueryInfo);
void tscRestoreFuncForSTableQuery(SQueryInfo *pQueryInfo); void tscRestoreFuncForSTableQuery(SQueryInfo *pQueryInfo);
int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo); int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo);
void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo); void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo, bool converted);
void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock, bool convertNchar); void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock, bool convertNchar);
void handleDownstreamOperator(SSqlObj** pSqlList, int32_t numOfUpstream, SQueryInfo* px, SSqlObj* pParent); void handleDownstreamOperator(SSqlObj** pSqlList, int32_t numOfUpstream, SQueryInfo* px, SSqlObj* pParent);
......
...@@ -1522,6 +1522,7 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) { ...@@ -1522,6 +1522,7 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) {
pSql->isBind = true; pSql->isBind = true;
pStmt->pSql = pSql; pStmt->pSql = pSql;
pStmt->last = STMT_INIT; pStmt->last = STMT_INIT;
registerSqlObj(pSql);
return pStmt; return pStmt;
} }
...@@ -1783,7 +1784,9 @@ int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name) { ...@@ -1783,7 +1784,9 @@ int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name) {
int taos_stmt_close(TAOS_STMT* stmt) { int taos_stmt_close(TAOS_STMT* stmt) {
STscStmt* pStmt = (STscStmt*)stmt; STscStmt* pStmt = (STscStmt*)stmt;
STMT_CHECK if (pStmt == NULL || pStmt->taos == NULL) {
STMT_RET(TSDB_CODE_TSC_DISCONNECTED);
}
if (!pStmt->isInsert) { if (!pStmt->isInsert) {
SNormalStmt* normal = &pStmt->normal; SNormalStmt* normal = &pStmt->normal;
if (normal->params != NULL) { if (normal->params != NULL) {
...@@ -1805,8 +1808,9 @@ int taos_stmt_close(TAOS_STMT* stmt) { ...@@ -1805,8 +1808,9 @@ int taos_stmt_close(TAOS_STMT* stmt) {
pStmt->mtb.pTableBlockHashList = tscDestroyBlockHashTable(pStmt->mtb.pTableBlockHashList, rmMeta); pStmt->mtb.pTableBlockHashList = tscDestroyBlockHashTable(pStmt->mtb.pTableBlockHashList, rmMeta);
if (pStmt->pSql){ if (pStmt->pSql){
taosHashCleanup(pStmt->pSql->cmd.insertParam.pTableBlockHashList); taosHashCleanup(pStmt->pSql->cmd.insertParam.pTableBlockHashList);
pStmt->pSql->cmd.insertParam.pTableBlockHashList = NULL;
} }
pStmt->pSql->cmd.insertParam.pTableBlockHashList = NULL;
taosArrayDestroy(pStmt->mtb.tags); taosArrayDestroy(pStmt->mtb.tags);
tfree(pStmt->mtb.sqlstr); tfree(pStmt->mtb.sqlstr);
} }
...@@ -1817,6 +1821,7 @@ int taos_stmt_close(TAOS_STMT* stmt) { ...@@ -1817,6 +1821,7 @@ int taos_stmt_close(TAOS_STMT* stmt) {
} else { } else {
tscFreeSqlObj(pStmt->pSql); tscFreeSqlObj(pStmt->pSql);
} }
tfree(pStmt); tfree(pStmt);
STMT_RET(TSDB_CODE_SUCCESS); STMT_RET(TSDB_CODE_SUCCESS);
} }
...@@ -1994,6 +1999,7 @@ TAOS_RES *taos_stmt_use_result(TAOS_STMT* stmt) { ...@@ -1994,6 +1999,7 @@ TAOS_RES *taos_stmt_use_result(TAOS_STMT* stmt) {
return NULL; return NULL;
} }
TAOS_RES* result = pStmt->pSql; TAOS_RES* result = pStmt->pSql;
pStmt->pSql = NULL;
return result; return result;
} }
......
...@@ -3486,6 +3486,27 @@ static bool groupbyTagsOrNull(SQueryInfo* pQueryInfo) { ...@@ -3486,6 +3486,27 @@ static bool groupbyTagsOrNull(SQueryInfo* pQueryInfo) {
return true; return true;
} }
bool groupbyTbname(SQueryInfo* pQueryInfo) {
if (pQueryInfo->groupbyExpr.columnInfo == NULL ||
taosArrayGetSize(pQueryInfo->groupbyExpr.columnInfo) == 0) {
return false;
}
size_t s = taosArrayGetSize(pQueryInfo->groupbyExpr.columnInfo);
for (int32_t i = 0; i < s; i++) {
SColIndex* colIndex = taosArrayGet(pQueryInfo->groupbyExpr.columnInfo, i);
if (colIndex->colIndex == TSDB_TBNAME_COLUMN_INDEX) {
return true;
}
}
return false;
}
static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery, bool twQuery) { static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery, bool twQuery) {
int32_t startIdx = 0; int32_t startIdx = 0;
int32_t aggUdf = 0; int32_t aggUdf = 0;
...@@ -7114,6 +7135,35 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, char* ...@@ -7114,6 +7135,35 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, char*
} }
} }
int32_t validateFunctionFromUpstream(SQueryInfo* pQueryInfo, char* msg) {
const char* msg1 = "TWA/Diff/Derivative/Irate are not allowed to apply to super table without group by tbname";
int32_t numOfExprs = (int32_t)tscNumOfExprs(pQueryInfo);
size_t upNum = taosArrayGetSize(pQueryInfo->pUpstream);
for (int32_t i = 0; i < numOfExprs; ++i) {
SExprInfo* pExpr = tscExprGet(pQueryInfo, i);
int32_t f = pExpr->base.functionId;
if (f == TSDB_FUNC_DERIVATIVE || f == TSDB_FUNC_TWA || f == TSDB_FUNC_IRATE || f == TSDB_FUNC_DIFF) {
for (int32_t j = 0; j < upNum; ++j) {
SQueryInfo* pUp = taosArrayGetP(pQueryInfo->pUpstream, j);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pUp, 0);
bool isSTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
if ((!isSTable) || groupbyTbname(pUp)) {
return TSDB_CODE_SUCCESS;
}
}
return invalidOperationMsg(msg, msg1);
}
}
return TSDB_CODE_SUCCESS;
}
int32_t doLocalQueryProcess(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode) { int32_t doLocalQueryProcess(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode) {
const char* msg1 = "only one expression allowed"; const char* msg1 = "only one expression allowed";
const char* msg2 = "invalid expression in select clause"; const char* msg2 = "invalid expression in select clause";
...@@ -8088,6 +8138,10 @@ static int32_t getTableNameFromSubquery(SSqlNode* pSqlNode, SArray* tableNameLis ...@@ -8088,6 +8138,10 @@ static int32_t getTableNameFromSubquery(SSqlNode* pSqlNode, SArray* tableNameLis
int32_t num = (int32_t)taosArrayGetSize(sub->pSubquery); int32_t num = (int32_t)taosArrayGetSize(sub->pSubquery);
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
SSqlNode* p = taosArrayGetP(sub->pSubquery, i); SSqlNode* p = taosArrayGetP(sub->pSubquery, i);
if (p->from == NULL) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
if (p->from->type == SQL_NODE_FROM_TABLELIST) { if (p->from->type == SQL_NODE_FROM_TABLELIST) {
int32_t code = getTableNameFromSqlNode(p, tableNameList, msgBuf, pSql); int32_t code = getTableNameFromSqlNode(p, tableNameList, msgBuf, pSql);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -8663,6 +8717,10 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf ...@@ -8663,6 +8717,10 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
return code; return code;
} }
if ((code = validateFunctionFromUpstream(pQueryInfo, tscGetErrorMsgPayload(pCmd))) != TSDB_CODE_SUCCESS) {
return code;
}
// updateFunctionInterBuf(pQueryInfo, false); // updateFunctionInterBuf(pQueryInfo, false);
updateLastScanOrderIfNeeded(pQueryInfo); updateLastScanOrderIfNeeded(pQueryInfo);
......
...@@ -1892,7 +1892,7 @@ static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) { ...@@ -1892,7 +1892,7 @@ static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
return pRes->code; return pRes->code;
} }
tscSetResRawPtr(pRes, pQueryInfo); tscSetResRawPtr(pRes, pQueryInfo, pRes->dataConverted);
} else { } else {
tscResetForNextRetrieve(pRes); tscResetForNextRetrieve(pRes);
} }
...@@ -2807,7 +2807,7 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { ...@@ -2807,7 +2807,7 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
(tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) &&
!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_QUERY) && !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_QUERY) &&
!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE))) { !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE))) {
tscSetResRawPtr(pRes, pQueryInfo); tscSetResRawPtr(pRes, pQueryInfo, pRes->dataConverted);
} }
if (pSql->pSubscription != NULL) { if (pSql->pSubscription != NULL) {
...@@ -3011,6 +3011,8 @@ int32_t tscGetTableMetaImpl(SSqlObj* pSql, STableMetaInfo *pTableMetaInfo, bool ...@@ -3011,6 +3011,8 @@ int32_t tscGetTableMetaImpl(SSqlObj* pSql, STableMetaInfo *pTableMetaInfo, bool
return getTableMetaFromMnode(pSql, pTableMetaInfo, autocreate); return getTableMetaFromMnode(pSql, pTableMetaInfo, autocreate);
} }
} }
tscDebug("0x%"PRIx64 " %s retrieve tableMeta from cache, numOfCols:%d, numOfTags:%d", pSql->self, name, pMeta->tableInfo.numOfColumns, pMeta->tableInfo.numOfTags);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -2435,7 +2435,11 @@ static void doSendQueryReqs(SSchedMsg* pSchedMsg) { ...@@ -2435,7 +2435,11 @@ static void doSendQueryReqs(SSchedMsg* pSchedMsg) {
SSqlObj* pSql = pSchedMsg->ahandle; SSqlObj* pSql = pSchedMsg->ahandle;
SPair* p = pSchedMsg->msg; SPair* p = pSchedMsg->msg;
for(int32_t i = p->first; i < p->second; ++i) { for (int32_t i = p->first; i < p->second; ++i) {
if (i >= pSql->subState.numOfSub) {
tfree(p);
return;
}
SSqlObj* pSub = pSql->pSubs[i]; SSqlObj* pSub = pSql->pSubs[i];
SRetrieveSupport* pSupport = pSub->param; SRetrieveSupport* pSupport = pSub->param;
...@@ -2575,7 +2579,12 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { ...@@ -2575,7 +2579,12 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
int32_t numOfTasks = (pState->numOfSub + MAX_REQUEST_PER_TASK - 1)/MAX_REQUEST_PER_TASK; int32_t numOfTasks = (pState->numOfSub + MAX_REQUEST_PER_TASK - 1)/MAX_REQUEST_PER_TASK;
assert(numOfTasks >= 1); assert(numOfTasks >= 1);
int32_t num = (pState->numOfSub/numOfTasks) + 1; int32_t num;
if (pState->numOfSub / numOfTasks == MAX_REQUEST_PER_TASK) {
num = MAX_REQUEST_PER_TASK;
} else {
num = pState->numOfSub / numOfTasks + 1;
}
tscDebug("0x%"PRIx64 " query will be sent by %d threads", pSql->self, numOfTasks); tscDebug("0x%"PRIx64 " query will be sent by %d threads", pSql->self, numOfTasks);
for(int32_t j = 0; j < numOfTasks; ++j) { for(int32_t j = 0; j < numOfTasks; ++j) {
...@@ -3402,6 +3411,7 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) { ...@@ -3402,6 +3411,7 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) {
} }
if (numOfRes == 0) { // no result any more, free all subquery objects if (numOfRes == 0) { // no result any more, free all subquery objects
pSql->res.completed = true;
freeJoinSubqueryObj(pSql); freeJoinSubqueryObj(pSql);
return; return;
} }
...@@ -3448,6 +3458,8 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) { ...@@ -3448,6 +3458,8 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) {
char* pData = getResultBlockPosition(pCmd1, pRes1, pIndex->columnIndex, &bytes); char* pData = getResultBlockPosition(pCmd1, pRes1, pIndex->columnIndex, &bytes);
memcpy(data, pData, bytes * numOfRes); memcpy(data, pData, bytes * numOfRes);
pRes->dataConverted = pRes1->dataConverted;
data += bytes * numOfRes; data += bytes * numOfRes;
} }
...@@ -3473,7 +3485,7 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) { ...@@ -3473,7 +3485,7 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) {
doArithmeticCalculate(pQueryInfo, pFilePage, rowSize, finalRowSize); doArithmeticCalculate(pQueryInfo, pFilePage, rowSize, finalRowSize);
pRes->data = pFilePage->data; pRes->data = pFilePage->data;
tscSetResRawPtr(pRes, pQueryInfo); tscSetResRawPtr(pRes, pQueryInfo, pRes->dataConverted);
} }
void tscBuildResFromSubqueries(SSqlObj *pSql) { void tscBuildResFromSubqueries(SSqlObj *pSql) {
......
...@@ -690,9 +690,13 @@ static void setResRawPtrImpl(SSqlRes* pRes, SInternalField* pInfo, int32_t i, bo ...@@ -690,9 +690,13 @@ static void setResRawPtrImpl(SSqlRes* pRes, SInternalField* pInfo, int32_t i, bo
memcpy(pRes->urow[i], pRes->buffer[i], pInfo->field.bytes * pRes->numOfRows); memcpy(pRes->urow[i], pRes->buffer[i], pInfo->field.bytes * pRes->numOfRows);
} }
if (convertNchar) {
pRes->dataConverted = true;
}
} }
void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo) { void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo, bool converted) {
assert(pRes->numOfCols > 0); assert(pRes->numOfCols > 0);
if (pRes->numOfRows == 0) { if (pRes->numOfRows == 0) {
return; return;
...@@ -705,7 +709,7 @@ void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo) { ...@@ -705,7 +709,7 @@ void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
pRes->length[i] = pInfo->field.bytes; pRes->length[i] = pInfo->field.bytes;
offset += pInfo->field.bytes; offset += pInfo->field.bytes;
setResRawPtrImpl(pRes, pInfo, i, true); setResRawPtrImpl(pRes, pInfo, i, converted ? false : true);
} }
} }
...@@ -3424,6 +3428,7 @@ void tscResetForNextRetrieve(SSqlRes* pRes) { ...@@ -3424,6 +3428,7 @@ void tscResetForNextRetrieve(SSqlRes* pRes) {
pRes->row = 0; pRes->row = 0;
pRes->numOfRows = 0; pRes->numOfRows = 0;
pRes->dataConverted = false;
} }
void tscInitResForMerge(SSqlRes* pRes) { void tscInitResForMerge(SSqlRes* pRes) {
......
# TDengine Connector for Python # TDengine Connector for Python
[TDengine] connector for Python enables python programs to access TDengine, using an API which is compliant with the Python DB API 2.0 (PEP-249). It uses TDengine C client library for client server communications. [TDengine](https://github.com/taosdata/TDengine) connector for Python enables python programs to access TDengine,
using an API which is compliant with the Python DB API 2.0 (PEP-249). It uses TDengine C client library for client server communications.
## Install ## Install
...@@ -11,8 +12,417 @@ pip install ./TDengine/src/connector/python ...@@ -11,8 +12,417 @@ pip install ./TDengine/src/connector/python
## Source Code ## Source Code
[TDengine] connector for Python source code is hosted on [GitHub](https://github.com/taosdata/TDengine/tree/develop/src/connector/python). [TDengine](https://github.com/taosdata/TDengine) connector for Python source code is hosted on [GitHub](https://github.com/taosdata/TDengine/tree/develop/src/connector/python).
## License - AGPL ## Examples
### Query with PEP-249 API
```python
import taos
conn = taos.connect()
cursor = conn.cursor()
cursor.execute("show databases")
results = cursor.fetchall()
for row in results:
print(row)
cursor.close()
conn.close()
```
### Query with objective API
```python
import taos
conn = taos.connect()
conn.exec("create database if not exists pytest")
result = conn.query("show databases")
num_of_fields = result.field_count
for field in result.fields:
print(field)
for row in result:
print(row)
result.close()
conn.exec("drop database pytest")
conn.close()
```
### Query with async API
```python
from taos import *
from ctypes import *
import time
def fetch_callback(p_param, p_result, num_of_rows):
print("fetched ", num_of_rows, "rows")
p = cast(p_param, POINTER(Counter))
result = TaosResult(p_result)
if num_of_rows == 0:
print("fetching completed")
p.contents.done = True
result.close()
return
if num_of_rows < 0:
p.contents.done = True
result.check_error(num_of_rows)
result.close()
return None
for row in result.rows_iter(num_of_rows):
# print(row)
None
p.contents.count += result.row_count
result.fetch_rows_a(fetch_callback, p_param)
def query_callback(p_param, p_result, code):
# type: (c_void_p, c_void_p, c_int) -> None
if p_result == None:
return
result = TaosResult(p_result)
if code == 0:
result.fetch_rows_a(fetch_callback, p_param)
result.check_error(code)
class Counter(Structure):
_fields_ = [("count", c_int), ("done", c_bool)]
def __str__(self):
return "{ count: %d, done: %s }" % (self.count, self.done)
def test_query(conn):
# type: (TaosConnection) -> None
counter = Counter(count=0)
conn.query_a("select * from log.log", query_callback, byref(counter))
while not counter.done:
print("wait query callback")
time.sleep(1)
print(counter)
conn.close()
if __name__ == "__main__":
test_query(connect())
```
### Statement API - Bind row after row
```python
from taos import *
conn = connect()
dbname = "pytest_taos_stmt"
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.exec(
"create table if not exists log(ts timestamp, bo bool, nil tinyint, \
ti tinyint, si smallint, ii int, bi bigint, tu tinyint unsigned, \
su smallint unsigned, iu int unsigned, bu bigint unsigned, \
ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)",
)
stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
params = new_bind_params(16)
params[0].timestamp(1626861392589)
params[1].bool(True)
params[2].null()
params[3].tinyint(2)
params[4].smallint(3)
params[5].int(4)
params[6].bigint(5)
params[7].tinyint_unsigned(6)
params[8].smallint_unsigned(7)
params[9].int_unsigned(8)
params[10].bigint_unsigned(9)
params[11].float(10.1)
params[12].double(10.11)
params[13].binary("hello")
params[14].nchar("stmt")
params[15].timestamp(1626861392589)
stmt.bind_param(params)
params[0].timestamp(1626861392590)
params[15].null()
stmt.bind_param(params)
stmt.execute()
result = stmt.use_result()
assert result.affected_rows == 2
result.close()
result = conn.query("select * from log")
for row in result:
print(row)
result.close()
stmt.close()
conn.close()
```
### Statement API - Bind multi rows
```python
from taos import *
conn = connect()
dbname = "pytest_taos_stmt"
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.exec(
"create table if not exists log(ts timestamp, bo bool, nil tinyint, \
ti tinyint, si smallint, ii int, bi bigint, tu tinyint unsigned, \
su smallint unsigned, iu int unsigned, bu bigint unsigned, \
ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)",
)
stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
params = new_multi_binds(16)
params[0].timestamp((1626861392589, 1626861392590, 1626861392591))
params[1].bool((True, None, False))
params[2].tinyint([-128, -128, None]) # -128 is tinyint null
params[3].tinyint([0, 127, None])
params[4].smallint([3, None, 2])
params[5].int([3, 4, None])
params[6].bigint([3, 4, None])
params[7].tinyint_unsigned([3, 4, None])
params[8].smallint_unsigned([3, 4, None])
params[9].int_unsigned([3, 4, None])
params[10].bigint_unsigned([3, 4, None])
params[11].float([3, None, 1])
params[12].double([3, None, 1.2])
params[13].binary(["abc", "dddafadfadfadfadfa", None])
params[14].nchar(["涛思数据", None, "a long string with 中文字符"])
params[15].timestamp([None, None, 1626861392591])
stmt.bind_param_batch(params)
stmt.execute()
result = stmt.use_result()
assert result.affected_rows == 3
result.close()
result = conn.query("select * from log")
for row in result:
print(row)
result.close()
stmt.close()
conn.close()
```
### Statement API - Subscribe
```python
import taos
conn = taos.connect()
dbname = "pytest_taos_subscribe_callback"
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.exec("create table if not exists log(ts timestamp, n int)")
for i in range(10):
conn.exec("insert into log values(now, %d)" % i)
sub = conn.subscribe(True, "test", "select * from log", 1000)
print("# consume from begin")
for ts, n in sub.consume():
print(ts, n)
print("# consume new data")
for i in range(5):
conn.exec("insert into log values(now, %d)(now+1s, %d)" % (i, i))
result = sub.consume()
for ts, n in result:
print(ts, n)
print("# consume with a stop condition")
for i in range(10):
conn.exec("insert into log values(now, %d)" % int(random() * 10))
result = sub.consume()
try:
ts, n = next(result)
print(ts, n)
if n > 5:
result.stop_query()
print("## stopped")
break
except StopIteration:
continue
sub.close()
conn.exec("drop database if exists %s" % dbname)
conn.close()
```
### Statement API - Subscribe asynchronously with callback
```python
from taos import *
from ctypes import *
import time
def subscribe_callback(p_sub, p_result, p_param, errno):
# type: (c_void_p, c_void_p, c_void_p, c_int) -> None
print("# fetch in callback")
result = TaosResult(p_result)
result.check_error(errno)
for row in result.rows_iter():
ts, n = row()
print(ts, n)
def test_subscribe_callback(conn):
# type: (TaosConnection) -> None
dbname = "pytest_taos_subscribe_callback"
try:
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.exec("create table if not exists log(ts timestamp, n int)")
print("# subscribe with callback")
sub = conn.subscribe(False, "test", "select * from log", 1000, subscribe_callback)
for i in range(10):
conn.exec("insert into log values(now, %d)" % i)
time.sleep(0.7)
sub.close()
conn.exec("drop database if exists %s" % dbname)
conn.close()
except Exception as err:
conn.exec("drop database if exists %s" % dbname)
conn.close()
raise err
if __name__ == "__main__":
test_subscribe_callback(connect())
```
### Statement API - Stream
```python
from taos import *
from ctypes import *
def stream_callback(p_param, p_result, p_row):
# type: (c_void_p, c_void_p, c_void_p) -> None
if p_result == None or p_row == None:
return
result = TaosResult(p_result)
row = TaosRow(result, p_row)
try:
ts, count = row()
p = cast(p_param, POINTER(Counter))
p.contents.count += count
print("[%s] inserted %d in 5s, total count: %d" % (ts.strftime("%Y-%m-%d %H:%M:%S"), count, p.contents.count))
except Exception as err:
print(err)
raise err
class Counter(ctypes.Structure):
_fields_ = [
("count", c_int),
]
def __str__(self):
return "%d" % self.count
def test_stream(conn):
# type: (TaosConnection) -> None
dbname = "pytest_taos_stream"
try:
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.exec("create table if not exists log(ts timestamp, n int)")
result = conn.query("select count(*) from log interval(5s)")
assert result.field_count == 2
counter = Counter()
counter.count = 0
stream = conn.stream("select count(*) from log interval(5s)", stream_callback, param=byref(counter))
for _ in range(0, 20):
conn.exec("insert into log values(now,0)(now+1s, 1)(now + 2s, 2)")
time.sleep(2)
stream.close()
conn.exec("drop database if exists %s" % dbname)
conn.close()
except Exception as err:
conn.exec("drop database if exists %s" % dbname)
conn.close()
raise err
if __name__ == "__main__":
test_stream(connect())
```
### Insert with line protocol
```python
import taos
conn = taos.connect()
dbname = "pytest_line"
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s precision 'us'" % dbname)
conn.select_db(dbname)
lines = [
'st,t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"pass",c2=false,c4=4f64 1626006833639000000ns',
'st,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"pass it again",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000ns',
'stf,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"pass it again_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000ns',
]
conn.insert_lines(lines)
print("inserted")
lines = [
'stf,t1=5i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"pass it again_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000ns',
]
conn.insert_lines(lines)
result = conn.query("show tables")
for row in result:
print(row)
result.close()
conn.exec("drop database if exists %s" % dbname)
conn.close()
```
## License - AGPL-3.0
Keep same with [TDengine](https://github.com/taosdata/TDengine). Keep same with [TDengine](https://github.com/taosdata/TDengine).
# encoding:UTF-8
from taos import *
conn = connect()
dbname = "pytest_taos_stmt_multi"
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.execute(
"create table if not exists log(ts timestamp, bo bool, nil tinyint, \
ti tinyint, si smallint, ii int, bi bigint, tu tinyint unsigned, \
su smallint unsigned, iu int unsigned, bu bigint unsigned, \
ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)",
)
stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
params = new_multi_binds(16)
params[0].timestamp((1626861392589, 1626861392590, 1626861392591))
params[1].bool((True, None, False))
params[2].tinyint([-128, -128, None]) # -128 is tinyint null
params[3].tinyint([0, 127, None])
params[4].smallint([3, None, 2])
params[5].int([3, 4, None])
params[6].bigint([3, 4, None])
params[7].tinyint_unsigned([3, 4, None])
params[8].smallint_unsigned([3, 4, None])
params[9].int_unsigned([3, 4, None])
params[10].bigint_unsigned([3, 4, None])
params[11].float([3, None, 1])
params[12].double([3, None, 1.2])
params[13].binary(["abc", "dddafadfadfadfadfa", None])
params[14].nchar(["涛思数据", None, "a long string with 中文字符"])
params[15].timestamp([None, None, 1626861392591])
stmt.bind_param_batch(params)
stmt.execute()
result = stmt.use_result()
assert result.affected_rows == 3
result.close()
result = conn.query("select * from log")
for row in result:
print(row)
result.close()
stmt.close()
conn.close()
\ No newline at end of file
from taos import *
conn = connect()
dbname = "pytest_taos_stmt"
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.execute(
"create table if not exists log(ts timestamp, bo bool, nil tinyint, \
ti tinyint, si smallint, ii int, bi bigint, tu tinyint unsigned, \
su smallint unsigned, iu int unsigned, bu bigint unsigned, \
ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)",
)
stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
params = new_bind_params(16)
params[0].timestamp(1626861392589)
params[1].bool(True)
params[2].null()
params[3].tinyint(2)
params[4].smallint(3)
params[5].int(4)
params[6].bigint(5)
params[7].tinyint_unsigned(6)
params[8].smallint_unsigned(7)
params[9].int_unsigned(8)
params[10].bigint_unsigned(9)
params[11].float(10.1)
params[12].double(10.11)
params[13].binary("hello")
params[14].nchar("stmt")
params[15].timestamp(1626861392589)
stmt.bind_param(params)
params[0].timestamp(1626861392590)
params[15].null()
stmt.bind_param(params)
stmt.execute()
result = stmt.use_result()
assert result.affected_rows == 2
# No need to explicitly close, but ok for you
# result.close()
result = conn.query("select * from log")
for row in result:
print(row)
# No need to explicitly close, but ok for you
# result.close()
# stmt.close()
# conn.close()
import taos
conn = taos.connect()
dbname = "pytest_line"
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s precision 'us'" % dbname)
conn.select_db(dbname)
lines = [
'st,t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"pass",c2=false,c4=4f64 1626006833639000000ns',
]
conn.insert_lines(lines)
print("inserted")
conn.insert_lines(lines)
result = conn.query("show tables")
for row in result:
print(row)
conn.execute("drop database if exists %s" % dbname)
import taos
conn = taos.connect()
cursor = conn.cursor()
cursor.execute("show databases")
results = cursor.fetchall()
for row in results:
print(row)
from taos import *
from ctypes import *
import time
def fetch_callback(p_param, p_result, num_of_rows):
print("fetched ", num_of_rows, "rows")
p = cast(p_param, POINTER(Counter))
result = TaosResult(p_result)
if num_of_rows == 0:
print("fetching completed")
p.contents.done = True
# should explicitly close the result in fetch completed or cause error
result.close()
return
if num_of_rows < 0:
p.contents.done = True
result.check_error(num_of_rows)
result.close()
return None
for row in result.rows_iter(num_of_rows):
# print(row)
None
p.contents.count += result.row_count
result.fetch_rows_a(fetch_callback, p_param)
def query_callback(p_param, p_result, code):
# type: (c_void_p, c_void_p, c_int) -> None
if p_result == None:
return
result = TaosResult(p_result)
if code == 0:
result.fetch_rows_a(fetch_callback, p_param)
result.check_error(code)
# explicitly close result while query failed
result.close()
class Counter(Structure):
_fields_ = [("count", c_int), ("done", c_bool)]
def __str__(self):
return "{ count: %d, done: %s }" % (self.count, self.done)
def test_query(conn):
# type: (TaosConnection) -> None
counter = Counter(count=0)
conn.query_a("select * from log.log", query_callback, byref(counter))
while not counter.done:
print("wait query callback")
time.sleep(1)
print(counter)
# conn.close()
if __name__ == "__main__":
test_query(connect())
\ No newline at end of file
import taos
conn = taos.connect()
conn.execute("create database if not exists pytest")
result = conn.query("show databases")
num_of_fields = result.field_count
for field in result.fields:
print(field)
for row in result:
print(row)
conn.execute("drop database pytest")
from taos import *
from ctypes import *
import time
def subscribe_callback(p_sub, p_result, p_param, errno):
# type: (c_void_p, c_void_p, c_void_p, c_int) -> None
print("# fetch in callback")
result = TaosResult(p_result)
result.check_error(errno)
for row in result.rows_iter():
ts, n = row()
print(ts, n)
def test_subscribe_callback(conn):
# type: (TaosConnection) -> None
dbname = "pytest_taos_subscribe_callback"
try:
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.execute("create table if not exists log(ts timestamp, n int)")
print("# subscribe with callback")
sub = conn.subscribe(False, "test", "select * from log", 1000, subscribe_callback)
for i in range(10):
conn.execute("insert into log values(now, %d)" % i)
time.sleep(0.7)
# sub.close()
conn.execute("drop database if exists %s" % dbname)
# conn.close()
except Exception as err:
conn.execute("drop database if exists %s" % dbname)
# conn.close()
raise err
if __name__ == "__main__":
test_subscribe_callback(connect())
import taos
import random
conn = taos.connect()
dbname = "pytest_taos_subscribe"
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.execute("create table if not exists log(ts timestamp, n int)")
for i in range(10):
conn.execute("insert into log values(now, %d)" % i)
sub = conn.subscribe(False, "test", "select * from log", 1000)
print("# consume from begin")
for ts, n in sub.consume():
print(ts, n)
print("# consume new data")
for i in range(5):
conn.execute("insert into log values(now, %d)(now+1s, %d)" % (i, i))
result = sub.consume()
for ts, n in result:
print(ts, n)
sub.close(True)
print("# keep progress consume")
sub = conn.subscribe(False, "test", "select * from log", 1000)
result = sub.consume()
rows = result.fetch_all()
# consume from latest subscription needs root privilege(for /var/lib/taos).
assert result.row_count == 0
print("## consumed ", len(rows), "rows")
print("# consume with a stop condition")
for i in range(10):
conn.execute("insert into log values(now, %d)" % random.randint(0, 10))
result = sub.consume()
try:
ts, n = next(result)
print(ts, n)
if n > 5:
result.stop_query()
print("## stopped")
break
except StopIteration:
continue
sub.close()
# sub.close()
conn.execute("drop database if exists %s" % dbname)
# conn.close()
[tool.poetry]
name = "taos"
version = "2.1.0"
description = "TDengine connector for python"
authors = ["Taosdata Inc. <support@taosdata.com>"]
license = "AGPL-3.0"
readme = "README.md"
[tool.poetry.dependencies]
python = "^2.7 || ^3.4"
typing = "*"
[tool.poetry.dev-dependencies]
pytest = [
{ version = "^4.6", python = "^2.7" },
{ version = "^6.2", python = "^3.7" }
]
pdoc = { version = "^7.1.1", python = "^3.7" }
mypy = { version = "^0.910", python = "^3.6" }
black = { version = "^21.7b0", python = "^3.6" }
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
[tool.black]
line-length = 119
...@@ -5,7 +5,7 @@ with open("README.md", "r") as fh: ...@@ -5,7 +5,7 @@ with open("README.md", "r") as fh:
setuptools.setup( setuptools.setup(
name="taos", name="taos",
version="2.0.11", version="2.1.0",
author="Taosdata Inc.", author="Taosdata Inc.",
author_email="support@taosdata.com", author_email="support@taosdata.com",
description="TDengine python client package", description="TDengine python client package",
......
# encoding:UTF-8
"""
# TDengine Connector for Python
from .connection import TDengineConnection [TDengine](https://github.com/taosdata/TDengine) connector for Python enables python programs to access TDengine,
from .cursor import TDengineCursor using an API which is compliant with the Python DB API 2.0 (PEP-249). It uses TDengine C client library for client server communications.
# For some reason, the following is needed for VS Code (through PyLance) to ## Install
```sh
git clone --depth 1 https://github.com/taosdata/TDengine.git
pip install ./TDengine/src/connector/python
```
## Source Code
[TDengine](https://github.com/taosdata/TDengine) connector for Python source code is hosted on [GitHub](https://github.com/taosdata/TDengine/tree/develop/src/connector/python).
## Examples
### Query with PEP-249 API
```python
import taos
conn = taos.connect()
cursor = conn.cursor()
cursor.execute("show databases")
results = cursor.fetchall()
for row in results:
print(row)
cursor.close()
conn.close()
```
### Query with objective API
```python
import taos
conn = taos.connect()
conn.exec("create database if not exists pytest")
result = conn.query("show databases")
num_of_fields = result.field_count
for field in result.fields:
print(field)
for row in result:
print(row)
result.close()
conn.exec("drop database pytest")
conn.close()
```
### Query with async API
```python
from taos import *
from ctypes import *
import time
def fetch_callback(p_param, p_result, num_of_rows):
print("fetched ", num_of_rows, "rows")
p = cast(p_param, POINTER(Counter))
result = TaosResult(p_result)
if num_of_rows == 0:
print("fetching completed")
p.contents.done = True
result.close()
return
if num_of_rows < 0:
p.contents.done = True
result.check_error(num_of_rows)
result.close()
return None
for row in result.rows_iter(num_of_rows):
# print(row)
None
p.contents.count += result.row_count
result.fetch_rows_a(fetch_callback, p_param)
def query_callback(p_param, p_result, code):
# type: (c_void_p, c_void_p, c_int) -> None
if p_result == None:
return
result = TaosResult(p_result)
if code == 0:
result.fetch_rows_a(fetch_callback, p_param)
result.check_error(code)
class Counter(Structure):
_fields_ = [("count", c_int), ("done", c_bool)]
def __str__(self):
return "{ count: %d, done: %s }" % (self.count, self.done)
def test_query(conn):
# type: (TaosConnection) -> None
counter = Counter(count=0)
conn.query_a("select * from log.log", query_callback, byref(counter))
while not counter.done:
print("wait query callback")
time.sleep(1)
print(counter)
conn.close()
if __name__ == "__main__":
test_query(connect())
```
### Statement API - Bind row after row
```python
from taos import *
conn = connect()
dbname = "pytest_taos_stmt"
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.exec(
"create table if not exists log(ts timestamp, bo bool, nil tinyint, \\
ti tinyint, si smallint, ii int, bi bigint, tu tinyint unsigned, \\
su smallint unsigned, iu int unsigned, bu bigint unsigned, \\
ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)",
)
stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
params = new_bind_params(16)
params[0].timestamp(1626861392589)
params[1].bool(True)
params[2].null()
params[3].tinyint(2)
params[4].smallint(3)
params[5].int(4)
params[6].bigint(5)
params[7].tinyint_unsigned(6)
params[8].smallint_unsigned(7)
params[9].int_unsigned(8)
params[10].bigint_unsigned(9)
params[11].float(10.1)
params[12].double(10.11)
params[13].binary("hello")
params[14].nchar("stmt")
params[15].timestamp(1626861392589)
stmt.bind_param(params)
params[0].timestamp(1626861392590)
params[15].null()
stmt.bind_param(params)
stmt.execute()
result = stmt.use_result()
assert result.affected_rows == 2
result.close()
result = conn.query("select * from log")
for row in result:
print(row)
result.close()
stmt.close()
conn.close()
```
### Statement API - Bind multi rows
```python
from taos import *
conn = connect()
dbname = "pytest_taos_stmt"
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.exec(
"create table if not exists log(ts timestamp, bo bool, nil tinyint, \\
ti tinyint, si smallint, ii int, bi bigint, tu tinyint unsigned, \\
su smallint unsigned, iu int unsigned, bu bigint unsigned, \\
ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)",
)
stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
params = new_multi_binds(16)
params[0].timestamp((1626861392589, 1626861392590, 1626861392591))
params[1].bool((True, None, False))
params[2].tinyint([-128, -128, None]) # -128 is tinyint null
params[3].tinyint([0, 127, None])
params[4].smallint([3, None, 2])
params[5].int([3, 4, None])
params[6].bigint([3, 4, None])
params[7].tinyint_unsigned([3, 4, None])
params[8].smallint_unsigned([3, 4, None])
params[9].int_unsigned([3, 4, None])
params[10].bigint_unsigned([3, 4, None])
params[11].float([3, None, 1])
params[12].double([3, None, 1.2])
params[13].binary(["abc", "dddafadfadfadfadfa", None])
params[14].nchar(["涛思数据", None, "a long string with 中文字符"])
params[15].timestamp([None, None, 1626861392591])
stmt.bind_param_batch(params)
stmt.execute()
result = stmt.use_result()
assert result.affected_rows == 3
result.close()
result = conn.query("select * from log")
for row in result:
print(row)
result.close()
stmt.close()
conn.close()
```
### Statement API - Subscribe
```python
import taos
conn = taos.connect()
dbname = "pytest_taos_subscribe_callback"
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.exec("create table if not exists log(ts timestamp, n int)")
for i in range(10):
conn.exec("insert into log values(now, %d)" % i)
sub = conn.subscribe(True, "test", "select * from log", 1000)
print("# consume from begin")
for ts, n in sub.consume():
print(ts, n)
print("# consume new data")
for i in range(5):
conn.exec("insert into log values(now, %d)(now+1s, %d)" % (i, i))
result = sub.consume()
for ts, n in result:
print(ts, n)
print("# consume with a stop condition")
for i in range(10):
conn.exec("insert into log values(now, %d)" % int(random() * 10))
result = sub.consume()
try:
ts, n = next(result)
print(ts, n)
if n > 5:
result.stop_query()
print("## stopped")
break
except StopIteration:
continue
sub.close()
conn.exec("drop database if exists %s" % dbname)
conn.close()
```
### Statement API - Subscribe asynchronously with callback
```python
from taos import *
from ctypes import *
import time
def subscribe_callback(p_sub, p_result, p_param, errno):
# type: (c_void_p, c_void_p, c_void_p, c_int) -> None
print("# fetch in callback")
result = TaosResult(p_result)
result.check_error(errno)
for row in result.rows_iter():
ts, n = row()
print(ts, n)
def test_subscribe_callback(conn):
# type: (TaosConnection) -> None
dbname = "pytest_taos_subscribe_callback"
try:
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.exec("create table if not exists log(ts timestamp, n int)")
print("# subscribe with callback")
sub = conn.subscribe(False, "test", "select * from log", 1000, subscribe_callback)
for i in range(10):
conn.exec("insert into log values(now, %d)" % i)
time.sleep(0.7)
sub.close()
conn.exec("drop database if exists %s" % dbname)
conn.close()
except Exception as err:
conn.exec("drop database if exists %s" % dbname)
conn.close()
raise err
if __name__ == "__main__":
test_subscribe_callback(connect())
```
### Statement API - Stream
```python
from taos import *
from ctypes import *
def stream_callback(p_param, p_result, p_row):
# type: (c_void_p, c_void_p, c_void_p) -> None
if p_result == None or p_row == None:
return
result = TaosResult(p_result)
row = TaosRow(result, p_row)
try:
ts, count = row()
p = cast(p_param, POINTER(Counter))
p.contents.count += count
print("[%s] inserted %d in 5s, total count: %d" % (ts.strftime("%Y-%m-%d %H:%M:%S"), count, p.contents.count))
except Exception as err:
print(err)
raise err
class Counter(ctypes.Structure):
_fields_ = [
("count", c_int),
]
def __str__(self):
return "%d" % self.count
def test_stream(conn):
# type: (TaosConnection) -> None
dbname = "pytest_taos_stream"
try:
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.exec("create table if not exists log(ts timestamp, n int)")
result = conn.query("select count(*) from log interval(5s)")
assert result.field_count == 2
counter = Counter()
counter.count = 0
stream = conn.stream("select count(*) from log interval(5s)", stream_callback, param=byref(counter))
for _ in range(0, 20):
conn.exec("insert into log values(now,0)(now+1s, 1)(now + 2s, 2)")
time.sleep(2)
stream.close()
conn.exec("drop database if exists %s" % dbname)
conn.close()
except Exception as err:
conn.exec("drop database if exists %s" % dbname)
conn.close()
raise err
if __name__ == "__main__":
test_stream(connect())
```
### Insert with line protocol
```python
import taos
conn = taos.connect()
dbname = "pytest_line"
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s precision 'us'" % dbname)
conn.select_db(dbname)
lines = [
'st,t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"passit",c2=false,c4=4f64 1626006833639000000ns',
'st,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000ns',
'stf,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000ns',
]
conn.insert_lines(lines)
print("inserted")
lines = [
'stf,t1=5i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000ns',
]
conn.insert_lines(lines)
result = conn.query("show tables")
for row in result:
print(row)
result.close()
conn.exec("drop database if exists %s" % dbname)
conn.close()
```
## License - AGPL-3.0
Keep same with [TDengine](https://github.com/taosdata/TDengine).
"""
from .connection import TaosConnection
# For some reason, the following is needed for VS Code (through PyLance) to
# recognize that "error" is a valid module of the "taos" package. # recognize that "error" is a valid module of the "taos" package.
from .error import ProgrammingError from .error import *
from .bind import *
from .field import *
from .cursor import *
from .result import *
from .statement import *
from .subscription import *
try:
import importlib.metadata
__version__ = importlib.metadata.version("taos")
except:
None
# Globals # Globals
threadsafety = 0 threadsafety = 0
paramstyle = 'pyformat' paramstyle = "pyformat"
__all__ = ['connection', 'cursor']
__all__ = [
# functions
"connect",
"new_bind_param",
"new_bind_params",
"new_multi_binds",
"new_multi_bind",
# objects
"TaosBind",
"TaosConnection",
"TaosCursor",
"TaosResult",
"TaosRows",
"TaosRow",
"TaosStmt",
"PrecisionEnum",
]
def connect(*args, **kwargs): def connect(*args, **kwargs):
""" Function to return a TDengine connector object # type: (..., ...) -> TaosConnection
"""Function to return a TDengine connector object
Current supporting keyword parameters: Current supporting keyword parameters:
@dsn: Data source name as string @dsn: Data source name as string
...@@ -25,4 +483,4 @@ def connect(*args, **kwargs): ...@@ -25,4 +483,4 @@ def connect(*args, **kwargs):
@rtype: TDengineConnector @rtype: TDengineConnector
""" """
return TDengineConnection(*args, **kwargs) return TaosConnection(*args, **kwargs)
# encoding:UTF-8
import ctypes
from .constants import FieldType
from .error import *
from .precision import *
from datetime import datetime
from ctypes import *
import sys
_datetime_epoch = datetime.utcfromtimestamp(0)
def _is_not_none(obj):
return obj != None
class TaosBind(ctypes.Structure):
_fields_ = [
("buffer_type", c_int),
("buffer", c_void_p),
("buffer_length", c_size_t),
("length", POINTER(c_size_t)),
("is_null", POINTER(c_int)),
("is_unsigned", c_int),
("error", POINTER(c_int)),
("u", c_int64),
("allocated", c_int),
]
def null(self):
self.buffer_type = FieldType.C_NULL
self.is_null = pointer(c_int(1))
def bool(self, value):
self.buffer_type = FieldType.C_BOOL
self.buffer = cast(pointer(c_bool(value)), c_void_p)
self.buffer_length = sizeof(c_bool)
def tinyint(self, value):
self.buffer_type = FieldType.C_TINYINT
self.buffer = cast(pointer(c_int8(value)), c_void_p)
self.buffer_length = sizeof(c_int8)
def smallint(self, value):
self.buffer_type = FieldType.C_SMALLINT
self.buffer = cast(pointer(c_int16(value)), c_void_p)
self.buffer_length = sizeof(c_int16)
def int(self, value):
self.buffer_type = FieldType.C_INT
self.buffer = cast(pointer(c_int32(value)), c_void_p)
self.buffer_length = sizeof(c_int32)
def bigint(self, value):
self.buffer_type = FieldType.C_BIGINT
self.buffer = cast(pointer(c_int64(value)), c_void_p)
self.buffer_length = sizeof(c_int64)
def float(self, value):
self.buffer_type = FieldType.C_FLOAT
self.buffer = cast(pointer(c_float(value)), c_void_p)
self.buffer_length = sizeof(c_float)
def double(self, value):
self.buffer_type = FieldType.C_DOUBLE
self.buffer = cast(pointer(c_double(value)), c_void_p)
self.buffer_length = sizeof(c_double)
def binary(self, value):
buffer = None
length = 0
if isinstance(value, str):
bytes = value.encode("utf-8")
buffer = create_string_buffer(bytes)
length = len(bytes)
else:
buffer = value
length = len(value)
self.buffer_type = FieldType.C_BINARY
self.buffer = cast(buffer, c_void_p)
self.buffer_length = length
self.length = pointer(c_size_t(self.buffer_length))
def timestamp(self, value, precision=PrecisionEnum.Milliseconds):
if type(value) is datetime:
if precision == PrecisionEnum.Milliseconds:
ts = int(round((value - _datetime_epoch).total_seconds() * 1000))
elif precision == PrecisionEnum.Microseconds:
ts = int(round((value - _datetime_epoch).total_seconds() * 10000000))
else:
raise PrecisionError("datetime do not support nanosecond precision")
elif type(value) is float:
if precision == PrecisionEnum.Milliseconds:
ts = int(round(value * 1000))
elif precision == PrecisionEnum.Microseconds:
ts = int(round(value * 10000000))
else:
raise PrecisionError("time float do not support nanosecond precision")
elif isinstance(value, int) and not isinstance(value, bool):
ts = value
elif isinstance(value, str):
value = datetime.fromisoformat(value)
if precision == PrecisionEnum.Milliseconds:
ts = int(round(value * 1000))
elif precision == PrecisionEnum.Microseconds:
ts = int(round(value * 10000000))
else:
raise PrecisionError("datetime do not support nanosecond precision")
self.buffer_type = FieldType.C_TIMESTAMP
self.buffer = cast(pointer(c_int64(ts)), c_void_p)
self.buffer_length = sizeof(c_int64)
def nchar(self, value):
buffer = None
length = 0
if isinstance(value, str):
bytes = value.encode("utf-8")
buffer = create_string_buffer(bytes)
length = len(bytes)
else:
buffer = value
length = len(value)
self.buffer_type = FieldType.C_NCHAR
self.buffer = cast(buffer, c_void_p)
self.buffer_length = length
self.length = pointer(c_size_t(self.buffer_length))
def tinyint_unsigned(self, value):
self.buffer_type = FieldType.C_TINYINT_UNSIGNED
self.buffer = cast(pointer(c_uint8(value)), c_void_p)
self.buffer_length = sizeof(c_uint8)
def smallint_unsigned(self, value):
self.buffer_type = FieldType.C_SMALLINT_UNSIGNED
self.buffer = cast(pointer(c_uint16(value)), c_void_p)
self.buffer_length = sizeof(c_uint16)
def int_unsigned(self, value):
self.buffer_type = FieldType.C_INT_UNSIGNED
self.buffer = cast(pointer(c_uint32(value)), c_void_p)
self.buffer_length = sizeof(c_uint32)
def bigint_unsigned(self, value):
self.buffer_type = FieldType.C_BIGINT_UNSIGNED
self.buffer = cast(pointer(c_uint64(value)), c_void_p)
self.buffer_length = sizeof(c_uint64)
def _datetime_to_timestamp(value, precision):
# type: (datetime | float | int | str | c_int64, PrecisionEnum) -> c_int64
if value is None:
return FieldType.C_BIGINT_NULL
if type(value) is datetime:
if precision == PrecisionEnum.Milliseconds:
return int(round((value - _datetime_epoch).total_seconds() * 1000))
elif precision == PrecisionEnum.Microseconds:
return int(round((value - _datetime_epoch).total_seconds() * 10000000))
else:
raise PrecisionError("datetime do not support nanosecond precision")
elif type(value) is float:
if precision == PrecisionEnum.Milliseconds:
return int(round(value * 1000))
elif precision == PrecisionEnum.Microseconds:
return int(round(value * 10000000))
else:
raise PrecisionError("time float do not support nanosecond precision")
elif isinstance(value, int) and not isinstance(value, bool):
return c_int64(value)
elif isinstance(value, str):
value = datetime.fromisoformat(value)
if precision == PrecisionEnum.Milliseconds:
return int(round(value * 1000))
elif precision == PrecisionEnum.Microseconds:
return int(round(value * 10000000))
else:
raise PrecisionError("datetime do not support nanosecond precision")
elif isinstance(value, c_int64):
return value
return FieldType.C_BIGINT_NULL
class TaosMultiBind(ctypes.Structure):
_fields_ = [
("buffer_type", c_int),
("buffer", c_void_p),
("buffer_length", c_size_t),
("length", POINTER(c_int32)),
("is_null", c_char_p),
("num", c_int),
]
def null(self, num):
self.buffer_type = FieldType.C_NULL
self.is_null = cast((c_char * num)(*[1 for _ in range(num)]), c_char_p)
self.buffer = c_void_p(None)
self.num = num
def bool(self, values):
try:
buffer = cast(values, c_void_p)
except:
buffer_type = c_int8 * len(values)
try:
buffer = buffer_type(*values)
except:
buffer = buffer_type(*[v if v is not None else FieldType.C_BOOL_NULL for v in values])
self.buffer = cast(buffer, c_void_p)
self.num = len(values)
self.buffer_type = FieldType.C_BOOL
self.buffer_length = sizeof(c_bool)
def tinyint(self, values):
self.buffer_type = FieldType.C_TINYINT
self.buffer_length = sizeof(c_int8)
try:
buffer = cast(values, c_void_p)
except:
buffer_type = c_int8 * len(values)
try:
buffer = buffer_type(*values)
except:
buffer = buffer_type(*[v if v is not None else FieldType.C_TINYINT_NULL for v in values])
self.buffer = cast(buffer, c_void_p)
self.num = len(values)
def smallint(self, values):
self.buffer_type = FieldType.C_SMALLINT
self.buffer_length = sizeof(c_int16)
try:
buffer = cast(values, c_void_p)
except:
buffer_type = c_int16 * len(values)
try:
buffer = buffer_type(*values)
except:
buffer = buffer_type(*[v if v is not None else FieldType.C_SMALLINT_NULL for v in values])
self.buffer = cast(buffer, c_void_p)
self.num = len(values)
def int(self, values):
self.buffer_type = FieldType.C_INT
self.buffer_length = sizeof(c_int32)
try:
buffer = cast(values, c_void_p)
except:
buffer_type = c_int32 * len(values)
try:
buffer = buffer_type(*values)
except:
buffer = buffer_type(*[v if v is not None else FieldType.C_INT_NULL for v in values])
self.buffer = cast(buffer, c_void_p)
self.num = len(values)
def bigint(self, values):
self.buffer_type = FieldType.C_BIGINT
self.buffer_length = sizeof(c_int64)
try:
buffer = cast(values, c_void_p)
except:
buffer_type = c_int64 * len(values)
try:
buffer = buffer_type(*values)
except:
buffer = buffer_type(*[v if v is not None else FieldType.C_BIGINT_NULL for v in values])
self.buffer = cast(buffer, c_void_p)
self.num = len(values)
def float(self, values):
self.buffer_type = FieldType.C_FLOAT
self.buffer_length = sizeof(c_float)
try:
buffer = cast(values, c_void_p)
except:
buffer_type = c_float * len(values)
try:
buffer = buffer_type(*values)
except:
buffer = buffer_type(*[v if v is not None else FieldType.C_FLOAT_NULL for v in values])
self.buffer = cast(buffer, c_void_p)
self.num = len(values)
def double(self, values):
self.buffer_type = FieldType.C_DOUBLE
self.buffer_length = sizeof(c_double)
try:
buffer = cast(values, c_void_p)
except:
buffer_type = c_double * len(values)
try:
buffer = buffer_type(*values)
except:
buffer = buffer_type(*[v if v is not None else FieldType.C_DOUBLE_NULL for v in values])
self.buffer = cast(buffer, c_void_p)
self.num = len(values)
def _str_to_buffer(self, values):
self.num = len(values)
is_null = [1 if v == None else 0 for v in values]
self.is_null = cast((c_byte * self.num)(*is_null), c_char_p)
if sum(is_null) == self.num:
self.length = (c_int32 * len(values))(0 * self.num)
return
if sys.version_info < (3, 0):
_bytes = [bytes(value) if value is not None else None for value in values]
buffer_length = max(len(b) + 1 for b in _bytes if b is not None)
buffers = [
create_string_buffer(b, buffer_length) if b is not None else create_string_buffer(buffer_length)
for b in _bytes
]
buffer_all = b''.join(v[:] for v in buffers)
self.buffer = cast(c_char_p(buffer_all), c_void_p)
else:
_bytes = [value.encode("utf-8") if value is not None else None for value in values]
buffer_length = max(len(b) for b in _bytes if b is not None)
self.buffer = cast(
c_char_p(
b"".join(
[
create_string_buffer(b, buffer_length)
if b is not None
else create_string_buffer(buffer_length)
for b in _bytes
]
)
),
c_void_p,
)
self.length = (c_int32 * len(values))(*[len(b) if b is not None else 0 for b in _bytes])
self.buffer_length = buffer_length
def binary(self, values):
self.buffer_type = FieldType.C_BINARY
self._str_to_buffer(values)
def timestamp(self, values, precision=PrecisionEnum.Milliseconds):
try:
buffer = cast(values, c_void_p)
except:
buffer_type = c_int64 * len(values)
buffer = buffer_type(*[_datetime_to_timestamp(value, precision) for value in values])
self.buffer_type = FieldType.C_TIMESTAMP
self.buffer = cast(buffer, c_void_p)
self.buffer_length = sizeof(c_int64)
self.num = len(values)
def nchar(self, values):
# type: (list[str]) -> None
self.buffer_type = FieldType.C_NCHAR
self._str_to_buffer(values)
def tinyint_unsigned(self, values):
self.buffer_type = FieldType.C_TINYINT_UNSIGNED
self.buffer_length = sizeof(c_uint8)
try:
buffer = cast(values, c_void_p)
except:
buffer_type = c_uint8 * len(values)
try:
buffer = buffer_type(*values)
except:
buffer = buffer_type(*[v if v is not None else FieldType.C_TINYINT_UNSIGNED_NULL for v in values])
self.buffer = cast(buffer, c_void_p)
self.num = len(values)
def smallint_unsigned(self, values):
self.buffer_type = FieldType.C_SMALLINT_UNSIGNED
self.buffer_length = sizeof(c_uint16)
try:
buffer = cast(values, c_void_p)
except:
buffer_type = c_uint16 * len(values)
try:
buffer = buffer_type(*values)
except:
buffer = buffer_type(*[v if v is not None else FieldType.C_SMALLINT_UNSIGNED_NULL for v in values])
self.buffer = cast(buffer, c_void_p)
self.num = len(values)
def int_unsigned(self, values):
self.buffer_type = FieldType.C_INT_UNSIGNED
self.buffer_length = sizeof(c_uint32)
try:
buffer = cast(values, c_void_p)
except:
buffer_type = c_uint32 * len(values)
try:
buffer = buffer_type(*values)
except:
buffer = buffer_type(*[v if v is not None else FieldType.C_INT_UNSIGNED_NULL for v in values])
self.buffer = cast(buffer, c_void_p)
self.num = len(values)
def bigint_unsigned(self, values):
self.buffer_type = FieldType.C_BIGINT_UNSIGNED
self.buffer_length = sizeof(c_uint64)
try:
buffer = cast(values, c_void_p)
except:
buffer_type = c_uint64 * len(values)
try:
buffer = buffer_type(*values)
except:
buffer = buffer_type(*[v if v is not None else FieldType.C_BIGINT_UNSIGNED_NULL for v in values])
self.buffer = cast(buffer, c_void_p)
self.num = len(values)
def new_bind_param():
# type: () -> TaosBind
return TaosBind()
def new_bind_params(size):
# type: (int) -> Array[TaosBind]
return (TaosBind * size)()
def new_multi_bind():
# type: () -> TaosMultiBind
return TaosMultiBind()
def new_multi_binds(size):
# type: (int) -> Array[TaosMultiBind]
return (TaosMultiBind * size)()
from .cursor import TDengineCursor # encoding:UTF-8
from .subscription import TDengineSubscription from types import FunctionType
from .cinterface import CTaosInterface from .cinterface import *
from .cursor import TaosCursor
from .subscription import TaosSubscription
from .statement import TaosStmt
from .stream import TaosStream
from .result import *
class TDengineConnection(object): class TaosConnection(object):
""" TDengine connection object """TDengine connection object"""
"""
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self._conn = None self._conn = None
...@@ -21,63 +25,130 @@ class TDengineConnection(object): ...@@ -21,63 +25,130 @@ class TDengineConnection(object):
def config(self, **kwargs): def config(self, **kwargs):
# host # host
if 'host' in kwargs: if "host" in kwargs:
self._host = kwargs['host'] self._host = kwargs["host"]
# user # user
if 'user' in kwargs: if "user" in kwargs:
self._user = kwargs['user'] self._user = kwargs["user"]
# password # password
if 'password' in kwargs: if "password" in kwargs:
self._password = kwargs['password'] self._password = kwargs["password"]
# database # database
if 'database' in kwargs: if "database" in kwargs:
self._database = kwargs['database'] self._database = kwargs["database"]
# port # port
if 'port' in kwargs: if "port" in kwargs:
self._port = kwargs['port'] self._port = kwargs["port"]
# config # config
if 'config' in kwargs: if "config" in kwargs:
self._config = kwargs['config'] self._config = kwargs["config"]
self._chandle = CTaosInterface(self._config) self._chandle = CTaosInterface(self._config)
self._conn = self._chandle.connect( self._conn = self._chandle.connect(self._host, self._user, self._password, self._database, self._port)
self._host,
self._user,
self._password,
self._database,
self._port)
def close(self): def close(self):
"""Close current connection. """Close current connection."""
""" if self._conn:
return CTaosInterface.close(self._conn) taos_close(self._conn)
self._conn = None
def subscribe(self, restart, topic, sql, interval):
"""Create a subscription. @property
""" def client_info(self):
# type: () -> str
return taos_get_client_info()
@property
def server_info(self):
# type: () -> str
return taos_get_server_info(self._conn)
def select_db(self, database):
# type: (str) -> None
taos_select_db(self._conn, database)
def execute(self, sql):
# type: (str) -> None
"""Simplely execute sql ignoring the results"""
res = taos_query(self._conn, sql)
taos_free_result(res)
def query(self, sql):
# type: (str) -> TaosResult
result = taos_query(self._conn, sql)
return TaosResult(result, True, self)
def query_a(self, sql, callback, param):
# type: (str, async_query_callback_type, c_void_p) -> None
"""Asynchronously query a sql with callback function"""
taos_query_a(self._conn, sql, callback, param)
def subscribe(self, restart, topic, sql, interval, callback=None, param=None):
# type: (bool, str, str, int, subscribe_callback_type, c_void_p) -> TaosSubscription
"""Create a subscription."""
if self._conn is None: if self._conn is None:
return None return None
sub = CTaosInterface.subscribe( sub = taos_subscribe(self._conn, restart, topic, sql, interval, callback, param)
self._conn, restart, topic, sql, interval) return TaosSubscription(sub, callback != None)
return TDengineSubscription(sub)
def insertLines(self, lines): def statement(self, sql=None):
""" # type: (str | None) -> TaosStmt
insert lines through line protocol
"""
if self._conn is None: if self._conn is None:
return None return None
return CTaosInterface.insertLines(self._conn, lines) stmt = taos_stmt_init(self._conn)
if sql != None:
def cursor(self): taos_stmt_prepare(stmt, sql)
"""Return a new Cursor object using the connection.
return TaosStmt(stmt)
def load_table_info(self, tables):
# type: (str) -> None
taos_load_table_info(self._conn, tables)
def stream(self, sql, callback, stime=0, param=None, callback2=None):
# type: (str, Callable[[Any, TaosResult, TaosRows], None], int, Any, c_void_p) -> TaosStream
# cb = cast(callback, stream_callback_type)
# ref = byref(cb)
stream = taos_open_stream(self._conn, sql, callback, stime, param, callback2)
return TaosStream(stream)
def insert_lines(self, lines):
# type: (list[str]) -> None
"""Line protocol and schemaless support
## Example
```python
import taos
conn = taos.connect()
conn.exec("drop database if exists test")
conn.select_db("test")
lines = [
'ste,t2=5,t3=L"ste" c1=true,c2=4,c3="string" 1626056811855516532',
]
conn.insert_lines(lines)
```
## Exception
```python
try:
conn.insert_lines(lines)
except SchemalessError as err:
print(err)
```
""" """
return TDengineCursor(self) return taos_insert_lines(self._conn, lines)
def cursor(self):
# type: () -> TaosCursor
"""Return a new Cursor object using the connection."""
return TaosCursor(self)
def commit(self): def commit(self):
"""Commit any pending transaction to the database. """Commit any pending transaction to the database.
...@@ -87,17 +158,18 @@ class TDengineConnection(object): ...@@ -87,17 +158,18 @@ class TDengineConnection(object):
pass pass
def rollback(self): def rollback(self):
"""Void functionality """Void functionality"""
"""
pass pass
def clear_result_set(self): def clear_result_set(self):
"""Clear unused result set on this connection. """Clear unused result set on this connection."""
"""
pass pass
def __del__(self):
self.close()
if __name__ == "__main__": if __name__ == "__main__":
conn = TDengineConnection(host='192.168.1.107') conn = TaosConnection()
conn.close() conn.close()
print("Hello world") print("Hello world")
# encoding:UTF-8
"""Constants in TDengine python """Constants in TDengine python
""" """
from .dbapi import * import ctypes, struct
class FieldType(object): class FieldType(object):
"""TDengine Field Types """TDengine Field Types"""
"""
# type_code # type_code
C_NULL = 0 C_NULL = 0
C_BOOL = 1 C_BOOL = 1
...@@ -34,9 +36,9 @@ class FieldType(object): ...@@ -34,9 +36,9 @@ class FieldType(object):
C_INT_UNSIGNED_NULL = 4294967295 C_INT_UNSIGNED_NULL = 4294967295
C_BIGINT_NULL = -9223372036854775808 C_BIGINT_NULL = -9223372036854775808
C_BIGINT_UNSIGNED_NULL = 18446744073709551615 C_BIGINT_UNSIGNED_NULL = 18446744073709551615
C_FLOAT_NULL = float('nan') C_FLOAT_NULL = ctypes.c_float(struct.unpack("<f", b"\x00\x00\xf0\x7f")[0])
C_DOUBLE_NULL = float('nan') C_DOUBLE_NULL = ctypes.c_double(struct.unpack("<d", b"\x00\x00\x00\x00\x00\xff\xff\x7f")[0])
C_BINARY_NULL = bytearray([int('0xff', 16)]) C_BINARY_NULL = bytearray([int("0xff", 16)])
# Timestamp precision definition # Timestamp precision definition
C_TIMESTAMP_MILLI = 0 C_TIMESTAMP_MILLI = 0
C_TIMESTAMP_MICRO = 1 C_TIMESTAMP_MICRO = 1
......
from .cinterface import CTaosInterface # encoding:UTF-8
from .cinterface import *
from .error import * from .error import *
from .constants import FieldType from .constants import FieldType
from .result import *
# querySeqNum = 0
class TaosCursor(object):
class TDengineCursor(object):
"""Database cursor which is used to manage the context of a fetch operation. """Database cursor which is used to manage the context of a fetch operation.
Attributes: Attributes:
.description: Read-only attribute consists of 7-item sequences: .description: Read-only attribute consists of 7-item sequences:
> name (mondatory) > name (mandatory)
> type_code (mondatory) > type_code (mandatory)
> display_size > display_size
> internal_size > internal_size
> precision > precision
...@@ -55,8 +55,7 @@ class TDengineCursor(object): ...@@ -55,8 +55,7 @@ class TDengineCursor(object):
raise OperationalError("Invalid use of fetch iterator") raise OperationalError("Invalid use of fetch iterator")
if self._block_rows <= self._block_iter: if self._block_rows <= self._block_iter:
block, self._block_rows = CTaosInterface.fetchRow( block, self._block_rows = taos_fetch_row(self._result, self._fields)
self._result, self._fields)
if self._block_rows == 0: if self._block_rows == 0:
raise StopIteration raise StopIteration
self._block = list(map(tuple, zip(*block))) self._block = list(map(tuple, zip(*block)))
...@@ -69,20 +68,17 @@ class TDengineCursor(object): ...@@ -69,20 +68,17 @@ class TDengineCursor(object):
@property @property
def description(self): def description(self):
"""Return the description of the object. """Return the description of the object."""
"""
return self._description return self._description
@property @property
def rowcount(self): def rowcount(self):
"""Return the rowcount of the object """Return the rowcount of the object"""
"""
return self._rowcount return self._rowcount
@property @property
def affected_rows(self): def affected_rows(self):
"""Return the rowcount of insertion """Return the rowcount of insertion"""
"""
return self._affected_rows return self._affected_rows
def callproc(self, procname, *args): def callproc(self, procname, *args):
...@@ -96,8 +92,7 @@ class TDengineCursor(object): ...@@ -96,8 +92,7 @@ class TDengineCursor(object):
self._logfile = logfile self._logfile = logfile
def close(self): def close(self):
"""Close the cursor. """Close the cursor."""
"""
if self._connection is None: if self._connection is None:
return False return False
...@@ -107,8 +102,7 @@ class TDengineCursor(object): ...@@ -107,8 +102,7 @@ class TDengineCursor(object):
return True return True
def execute(self, operation, params=None): def execute(self, operation, params=None):
"""Prepare and execute a database operation (query or command). """Prepare and execute a database operation (query or command)."""
"""
if not operation: if not operation:
return None return None
...@@ -124,104 +118,91 @@ class TDengineCursor(object): ...@@ -124,104 +118,91 @@ class TDengineCursor(object):
# global querySeqNum # global querySeqNum
# querySeqNum += 1 # querySeqNum += 1
# localSeqNum = querySeqNum # avoid raice condition # localSeqNum = querySeqNum # avoid race condition
# print(" >> Exec Query ({}): {}".format(localSeqNum, str(stmt))) # print(" >> Exec Query ({}): {}".format(localSeqNum, str(stmt)))
self._result = CTaosInterface.query(self._connection._conn, stmt) self._result = taos_query(self._connection._conn, stmt)
# print(" << Query ({}) Exec Done".format(localSeqNum)) # print(" << Query ({}) Exec Done".format(localSeqNum))
if (self._logfile): if self._logfile:
with open(self._logfile, "a") as logfile: with open(self._logfile, "a") as logfile:
logfile.write("%s;\n" % operation) logfile.write("%s;\n" % operation)
errno = CTaosInterface.libtaos.taos_errno(self._result) if taos_field_count(self._result) == 0:
if errno == 0: affected_rows = taos_affected_rows(self._result)
if CTaosInterface.fieldsCount(self._result) == 0: self._affected_rows += affected_rows
self._affected_rows += CTaosInterface.affectedRows( return affected_rows
self._result)
return CTaosInterface.affectedRows(self._result)
else:
self._fields = CTaosInterface.useResult(
self._result)
return self._handle_result()
else: else:
raise ProgrammingError( self._fields = taos_fetch_fields(self._result)
CTaosInterface.errStr( return self._handle_result()
self._result), errno)
def executemany(self, operation, seq_of_parameters): def executemany(self, operation, seq_of_parameters):
"""Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters. """Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters."""
"""
pass pass
def fetchone(self): def fetchone(self):
"""Fetch the next row of a query result set, returning a single sequence, or None when no more data is available. """Fetch the next row of a query result set, returning a single sequence, or None when no more data is available."""
"""
pass pass
def fetchmany(self): def fetchmany(self):
pass pass
def istype(self, col, dataType): def istype(self, col, dataType):
if (dataType.upper() == "BOOL"): if dataType.upper() == "BOOL":
if (self._description[col][1] == FieldType.C_BOOL): if self._description[col][1] == FieldType.C_BOOL:
return True return True
if (dataType.upper() == "TINYINT"): if dataType.upper() == "TINYINT":
if (self._description[col][1] == FieldType.C_TINYINT): if self._description[col][1] == FieldType.C_TINYINT:
return True return True
if (dataType.upper() == "TINYINT UNSIGNED"): if dataType.upper() == "TINYINT UNSIGNED":
if (self._description[col][1] == FieldType.C_TINYINT_UNSIGNED): if self._description[col][1] == FieldType.C_TINYINT_UNSIGNED:
return True return True
if (dataType.upper() == "SMALLINT"): if dataType.upper() == "SMALLINT":
if (self._description[col][1] == FieldType.C_SMALLINT): if self._description[col][1] == FieldType.C_SMALLINT:
return True return True
if (dataType.upper() == "SMALLINT UNSIGNED"): if dataType.upper() == "SMALLINT UNSIGNED":
if (self._description[col][1] == FieldType.C_SMALLINT_UNSIGNED): if self._description[col][1] == FieldType.C_SMALLINT_UNSIGNED:
return True return True
if (dataType.upper() == "INT"): if dataType.upper() == "INT":
if (self._description[col][1] == FieldType.C_INT): if self._description[col][1] == FieldType.C_INT:
return True return True
if (dataType.upper() == "INT UNSIGNED"): if dataType.upper() == "INT UNSIGNED":
if (self._description[col][1] == FieldType.C_INT_UNSIGNED): if self._description[col][1] == FieldType.C_INT_UNSIGNED:
return True return True
if (dataType.upper() == "BIGINT"): if dataType.upper() == "BIGINT":
if (self._description[col][1] == FieldType.C_BIGINT): if self._description[col][1] == FieldType.C_BIGINT:
return True return True
if (dataType.upper() == "BIGINT UNSIGNED"): if dataType.upper() == "BIGINT UNSIGNED":
if (self._description[col][1] == FieldType.C_BIGINT_UNSIGNED): if self._description[col][1] == FieldType.C_BIGINT_UNSIGNED:
return True return True
if (dataType.upper() == "FLOAT"): if dataType.upper() == "FLOAT":
if (self._description[col][1] == FieldType.C_FLOAT): if self._description[col][1] == FieldType.C_FLOAT:
return True return True
if (dataType.upper() == "DOUBLE"): if dataType.upper() == "DOUBLE":
if (self._description[col][1] == FieldType.C_DOUBLE): if self._description[col][1] == FieldType.C_DOUBLE:
return True return True
if (dataType.upper() == "BINARY"): if dataType.upper() == "BINARY":
if (self._description[col][1] == FieldType.C_BINARY): if self._description[col][1] == FieldType.C_BINARY:
return True return True
if (dataType.upper() == "TIMESTAMP"): if dataType.upper() == "TIMESTAMP":
if (self._description[col][1] == FieldType.C_TIMESTAMP): if self._description[col][1] == FieldType.C_TIMESTAMP:
return True return True
if (dataType.upper() == "NCHAR"): if dataType.upper() == "NCHAR":
if (self._description[col][1] == FieldType.C_NCHAR): if self._description[col][1] == FieldType.C_NCHAR:
return True return True
return False return False
def fetchall_row(self): def fetchall_row(self):
"""Fetch all (remaining) rows of a query result, returning them as a sequence of sequences (e.g. a list of tuples). Note that the cursor's arraysize attribute can affect the performance of this operation. """Fetch all (remaining) rows of a query result, returning them as a sequence of sequences (e.g. a list of tuples). Note that the cursor's arraysize attribute can affect the performance of this operation."""
"""
if self._result is None or self._fields is None: if self._result is None or self._fields is None:
raise OperationalError("Invalid use of fetchall") raise OperationalError("Invalid use of fetchall")
buffer = [[] for i in range(len(self._fields))] buffer = [[] for i in range(len(self._fields))]
self._rowcount = 0 self._rowcount = 0
while True: while True:
block, num_of_fields = CTaosInterface.fetchRow( block, num_of_fields = taos_fetch_row(self._result, self._fields)
self._result, self._fields) errno = taos_errno(self._result)
errno = CTaosInterface.libtaos.taos_errno(self._result)
if errno != 0: if errno != 0:
raise ProgrammingError( raise ProgrammingError(taos_errstr(self._result), errno)
CTaosInterface.errStr(
self._result), errno)
if num_of_fields == 0: if num_of_fields == 0:
break break
self._rowcount += num_of_fields self._rowcount += num_of_fields
...@@ -230,19 +211,16 @@ class TDengineCursor(object): ...@@ -230,19 +211,16 @@ class TDengineCursor(object):
return list(map(tuple, zip(*buffer))) return list(map(tuple, zip(*buffer)))
def fetchall(self): def fetchall(self):
if self._result is None or self._fields is None: if self._result is None:
raise OperationalError("Invalid use of fetchall") raise OperationalError("Invalid use of fetchall")
fields = self._fields if self._fields is not None else taos_fetch_fields(self._result)
buffer = [[] for i in range(len(self._fields))] buffer = [[] for i in range(len(fields))]
self._rowcount = 0 self._rowcount = 0
while True: while True:
block, num_of_fields = CTaosInterface.fetchBlock( block, num_of_fields = taos_fetch_block(self._result, self._fields)
self._result, self._fields) errno = taos_errno(self._result)
errno = CTaosInterface.libtaos.taos_errno(self._result)
if errno != 0: if errno != 0:
raise ProgrammingError( raise ProgrammingError(taos_errstr(self._result), errno)
CTaosInterface.errStr(
self._result), errno)
if num_of_fields == 0: if num_of_fields == 0:
break break
self._rowcount += num_of_fields self._rowcount += num_of_fields
...@@ -250,9 +228,12 @@ class TDengineCursor(object): ...@@ -250,9 +228,12 @@ class TDengineCursor(object):
buffer[i].extend(block[i]) buffer[i].extend(block[i])
return list(map(tuple, zip(*buffer))) return list(map(tuple, zip(*buffer)))
def stop_query(self):
if self._result != None:
taos_stop_query(self._result)
def nextset(self): def nextset(self):
""" """ """
"""
pass pass
def setinputsize(self, sizes): def setinputsize(self, sizes):
...@@ -262,12 +243,11 @@ class TDengineCursor(object): ...@@ -262,12 +243,11 @@ class TDengineCursor(object):
pass pass
def _reset_result(self): def _reset_result(self):
"""Reset the result to unused version. """Reset the result to unused version."""
"""
self._description = [] self._description = []
self._rowcount = -1 self._rowcount = -1
if self._result is not None: if self._result is not None:
CTaosInterface.freeResult(self._result) taos_free_result(self._result)
self._result = None self._result = None
self._fields = None self._fields = None
self._block = None self._block = None
...@@ -276,11 +256,12 @@ class TDengineCursor(object): ...@@ -276,11 +256,12 @@ class TDengineCursor(object):
self._affected_rows = 0 self._affected_rows = 0
def _handle_result(self): def _handle_result(self):
"""Handle the return result from query. """Handle the return result from query."""
"""
self._description = [] self._description = []
for ele in self._fields: for ele in self._fields:
self._description.append( self._description.append((ele["name"], ele["type"], None, None, None, None, False))
(ele['name'], ele['type'], None, None, None, None, False))
return self._result return self._result
def __del__(self):
self.close()
"""Type Objects and Constructors.
"""
import time
import datetime
class DBAPITypeObject(object):
def __init__(self, *values):
self.values = values
def __com__(self, other):
if other in self.values:
return 0
if other < self.values:
return 1
else:
return -1
Date = datetime.date
Time = datetime.time
Timestamp = datetime.datetime
def DataFromTicks(ticks):
return Date(*time.localtime(ticks)[:3])
def TimeFromTicks(ticks):
return Time(*time.localtime(ticks)[3:6])
def TimestampFromTicks(ticks):
return Timestamp(*time.localtime(ticks)[:6])
Binary = bytes
# STRING = DBAPITypeObject(*constants.FieldType.get_string_types())
# BINARY = DBAPITypeObject(*constants.FieldType.get_binary_types())
# NUMBER = BAPITypeObject(*constants.FieldType.get_number_types())
# DATETIME = DBAPITypeObject(*constants.FieldType.get_timestamp_types())
# ROWID = DBAPITypeObject()
# encoding:UTF-8
"""Python exceptions """Python exceptions
""" """
class Error(Exception): class Error(Exception):
def __init__(self, msg=None, errno=None): def __init__(self, msg=None, errno=0xffff):
self.msg = msg self.msg = msg
self._full_msg = self.msg
self.errno = errno self.errno = errno
self._full_msg = "[0x%04x]: %s" % (self.errno & 0xffff, self.msg)
def __str__(self): def __str__(self):
return self._full_msg return self._full_msg
class Warning(Exception): class Warning(Exception):
"""Exception raised for important warnings like data truncations while inserting. """Exception raised for important warnings like data truncations while inserting."""
"""
pass pass
class InterfaceError(Error): class InterfaceError(Error):
"""Exception raised for errors that are related to the database interface rather than the database itself. """Exception raised for errors that are related to the database interface rather than the database itself."""
"""
pass pass
class DatabaseError(Error): class DatabaseError(Error):
"""Exception raised for errors that are related to the database. """Exception raised for errors that are related to the database."""
"""
pass pass
class ConnectionError(Error):
"""Exceptin raised for connection failed"""
pass
class DataError(DatabaseError): class DataError(DatabaseError):
"""Exception raised for errors that are due to problems with the processed data like division by zero, numeric value out of range. """Exception raised for errors that are due to problems with the processed data like division by zero, numeric value out of range."""
"""
pass pass
class OperationalError(DatabaseError): class OperationalError(DatabaseError):
"""Exception raised for errors that are related to the database's operation and not necessarily under the control of the programmer """Exception raised for errors that are related to the database's operation and not necessarily under the control of the programmer"""
"""
pass pass
class IntegrityError(DatabaseError): class IntegrityError(DatabaseError):
"""Exception raised when the relational integrity of the database is affected. """Exception raised when the relational integrity of the database is affected."""
"""
pass pass
class InternalError(DatabaseError): class InternalError(DatabaseError):
"""Exception raised when the database encounters an internal error. """Exception raised when the database encounters an internal error."""
"""
pass pass
class ProgrammingError(DatabaseError): class ProgrammingError(DatabaseError):
"""Exception raised for programming errors. """Exception raised for programming errors."""
"""
pass pass
class NotSupportedError(DatabaseError): class NotSupportedError(DatabaseError):
"""Exception raised in case a method or database API was used which is not supported by the database,. """Exception raised in case a method or database API was used which is not supported by the database,."""
"""
pass pass
class StatementError(DatabaseError):
"""Exception raised in STMT API."""
pass
class ResultError(DatabaseError):
"""Result related APIs."""
pass
class LinesError(DatabaseError):
"""taos_insert_lines errors."""
pass
\ No newline at end of file
# encoding:UTF-8
import ctypes
import math
import datetime
from ctypes import *
from .constants import FieldType
from .error import *
_datetime_epoch = datetime.datetime.fromtimestamp(0)
def _convert_millisecond_to_datetime(milli):
return _datetime_epoch + datetime.timedelta(seconds=milli / 1000.0)
def _convert_microsecond_to_datetime(micro):
return _datetime_epoch + datetime.timedelta(seconds=micro / 1000000.0)
def _convert_nanosecond_to_datetime(nanosec):
return nanosec
def _crow_timestamp_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C bool row to python row"""
_timestamp_converter = _convert_millisecond_to_datetime
if precision == FieldType.C_TIMESTAMP_MILLI:
_timestamp_converter = _convert_millisecond_to_datetime
elif precision == FieldType.C_TIMESTAMP_MICRO:
_timestamp_converter = _convert_microsecond_to_datetime
elif precision == FieldType.C_TIMESTAMP_NANO:
_timestamp_converter = _convert_nanosecond_to_datetime
else:
raise DatabaseError("Unknown precision returned from database")
return [
None if ele == FieldType.C_BIGINT_NULL else _timestamp_converter(ele)
for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_int64))[: abs(num_of_rows)]
]
def _crow_bool_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C bool row to python row"""
return [
None if ele == FieldType.C_BOOL_NULL else bool(ele)
for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[: abs(num_of_rows)]
]
def _crow_tinyint_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C tinyint row to python row"""
return [
None if ele == FieldType.C_TINYINT_NULL else ele
for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[: abs(num_of_rows)]
]
def _crow_tinyint_unsigned_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C tinyint row to python row"""
return [
None if ele == FieldType.C_TINYINT_UNSIGNED_NULL else ele
for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_ubyte))[: abs(num_of_rows)]
]
def _crow_smallint_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C smallint row to python row"""
return [
None if ele == FieldType.C_SMALLINT_NULL else ele
for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_short))[: abs(num_of_rows)]
]
def _crow_smallint_unsigned_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C smallint row to python row"""
return [
None if ele == FieldType.C_SMALLINT_UNSIGNED_NULL else ele
for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_ushort))[: abs(num_of_rows)]
]
def _crow_int_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C int row to python row"""
return [
None if ele == FieldType.C_INT_NULL else ele
for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_int))[: abs(num_of_rows)]
]
def _crow_int_unsigned_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C int row to python row"""
return [
None if ele == FieldType.C_INT_UNSIGNED_NULL else ele
for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_uint))[: abs(num_of_rows)]
]
def _crow_bigint_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C bigint row to python row"""
return [
None if ele == FieldType.C_BIGINT_NULL else ele
for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_int64))[: abs(num_of_rows)]
]
def _crow_bigint_unsigned_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C bigint row to python row"""
return [
None if ele == FieldType.C_BIGINT_UNSIGNED_NULL else ele
for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_uint64))[: abs(num_of_rows)]
]
def _crow_float_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C float row to python row"""
return [
None if math.isnan(ele) else ele
for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_float))[: abs(num_of_rows)]
]
def _crow_double_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C double row to python row"""
return [
None if math.isnan(ele) else ele
for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_double))[: abs(num_of_rows)]
]
def _crow_binary_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C binary row to python row"""
assert nbytes is not None
return [
None if ele.value[0:1] == FieldType.C_BINARY_NULL else ele.value.decode("utf-8")
for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_char * nbytes)))[: abs(num_of_rows)]
]
def _crow_nchar_to_python(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C nchar row to python row"""
assert nbytes is not None
res = []
for i in range(abs(num_of_rows)):
try:
if num_of_rows >= 0:
tmpstr = ctypes.c_char_p(data)
res.append(tmpstr.value.decode())
else:
res.append(
(
ctypes.cast(
data + nbytes * i,
ctypes.POINTER(ctypes.c_wchar * (nbytes // 4)),
)
)[0].value
)
except ValueError:
res.append(None)
return res
def _crow_binary_to_python_block(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C binary row to python row"""
assert nbytes is not None
res = []
for i in range(abs(num_of_rows)):
try:
rbyte = ctypes.cast(data + nbytes * i, ctypes.POINTER(ctypes.c_short))[:1].pop()
tmpstr = ctypes.c_char_p(data + nbytes * i + 2)
res.append(tmpstr.value.decode()[0:rbyte])
except ValueError:
res.append(None)
return res
def _crow_nchar_to_python_block(data, num_of_rows, nbytes=None, precision=FieldType.C_TIMESTAMP_UNKNOWN):
"""Function to convert C nchar row to python row"""
assert nbytes is not None
res = []
for i in range(abs(num_of_rows)):
try:
tmpstr = ctypes.c_char_p(data + nbytes * i + 2)
res.append(tmpstr.value.decode())
except ValueError:
res.append(None)
return res
CONVERT_FUNC = {
FieldType.C_BOOL: _crow_bool_to_python,
FieldType.C_TINYINT: _crow_tinyint_to_python,
FieldType.C_SMALLINT: _crow_smallint_to_python,
FieldType.C_INT: _crow_int_to_python,
FieldType.C_BIGINT: _crow_bigint_to_python,
FieldType.C_FLOAT: _crow_float_to_python,
FieldType.C_DOUBLE: _crow_double_to_python,
FieldType.C_BINARY: _crow_binary_to_python,
FieldType.C_TIMESTAMP: _crow_timestamp_to_python,
FieldType.C_NCHAR: _crow_nchar_to_python,
FieldType.C_TINYINT_UNSIGNED: _crow_tinyint_unsigned_to_python,
FieldType.C_SMALLINT_UNSIGNED: _crow_smallint_unsigned_to_python,
FieldType.C_INT_UNSIGNED: _crow_int_unsigned_to_python,
FieldType.C_BIGINT_UNSIGNED: _crow_bigint_unsigned_to_python,
}
CONVERT_FUNC_BLOCK = {
FieldType.C_BOOL: _crow_bool_to_python,
FieldType.C_TINYINT: _crow_tinyint_to_python,
FieldType.C_SMALLINT: _crow_smallint_to_python,
FieldType.C_INT: _crow_int_to_python,
FieldType.C_BIGINT: _crow_bigint_to_python,
FieldType.C_FLOAT: _crow_float_to_python,
FieldType.C_DOUBLE: _crow_double_to_python,
FieldType.C_BINARY: _crow_binary_to_python_block,
FieldType.C_TIMESTAMP: _crow_timestamp_to_python,
FieldType.C_NCHAR: _crow_nchar_to_python_block,
FieldType.C_TINYINT_UNSIGNED: _crow_tinyint_unsigned_to_python,
FieldType.C_SMALLINT_UNSIGNED: _crow_smallint_unsigned_to_python,
FieldType.C_INT_UNSIGNED: _crow_int_unsigned_to_python,
FieldType.C_BIGINT_UNSIGNED: _crow_bigint_unsigned_to_python,
}
# Corresponding TAOS_FIELD structure in C
class TaosField(ctypes.Structure):
_fields_ = [
("_name", ctypes.c_char * 65),
("_type", ctypes.c_uint8),
("_bytes", ctypes.c_uint16),
]
@property
def name(self):
return self._name.decode("utf-8")
@property
def length(self):
"""alias to self.bytes"""
return self._bytes
@property
def bytes(self):
return self._bytes
@property
def type(self):
return self._type
def __dict__(self):
return {"name": self.name, "type": self.type, "bytes": self.length}
def __str__(self):
return "{name: %s, type: %d, bytes: %d}" % (self.name, self.type, self.length)
def __getitem__(self, item):
return getattr(self, item)
class TaosFields(object):
def __init__(self, fields, count):
if isinstance(fields, c_void_p):
self._fields = cast(fields, POINTER(TaosField))
if isinstance(fields, POINTER(TaosField)):
self._fields = fields
self._count = count
self._iter = 0
def as_ptr(self):
return self._fields
@property
def count(self):
return self._count
@property
def fields(self):
return self._fields
def __next__(self):
return self._next_field()
def next(self):
return self._next_field()
def _next_field(self):
if self._iter < self.count:
field = self._fields[self._iter]
self._iter += 1
return field
else:
raise StopIteration
def __getitem__(self, item):
return self._fields[item]
def __iter__(self):
return self
def __len__(self):
return self.count
class PrecisionEnum(object):
"""Precision enums"""
Milliseconds = 0
Microseconds = 1
Nanoseconds = 2
class PrecisionError(Exception):
"""Python datetime does not support nanoseconds error"""
pass
from .cinterface import *
# from .connection import TaosConnection
from .error import *
class TaosResult(object):
"""TDengine result interface"""
def __init__(self, result, close_after=False, conn=None):
# type: (c_void_p, bool, TaosConnection) -> TaosResult
# to make the __del__ order right
self._conn = conn
self._close_after = close_after
self._result = result
self._fields = None
self._field_count = None
self._precision = None
self._block = None
self._block_length = None
self._row_count = 0
def __iter__(self):
return self
def __next__(self):
return self._next_row()
def next(self):
# fetch next row
return self._next_row()
def _next_row(self):
if self._result is None or self.fields is None:
raise OperationalError("Invalid use of fetch iterator")
if self._block == None or self._block_iter >= self._block_length:
self._block, self._block_length = self.fetch_block()
self._block_iter = 0
# self._row_count += self._block_length
raw = self._block[self._block_iter]
self._block_iter += 1
return raw
@property
def fields(self):
"""fields definitions of the current result"""
if self._result is None:
raise ResultError("no result object setted")
if self._fields == None:
self._fields = taos_fetch_fields(self._result)
return self._fields
@property
def field_count(self):
"""Field count of the current result, eq to taos_field_count(result)"""
return self.fields.count
@property
def row_count(self):
"""Return the rowcount of the object"""
return self._row_count
@property
def precision(self):
if self._precision == None:
self._precision = taos_result_precision(self._result)
return self._precision
@property
def affected_rows(self):
return taos_affected_rows(self._result)
# @property
def field_lengths(self):
return taos_fetch_lengths(self._result, self.field_count)
def rows_iter(self, num_of_rows=None):
return TaosRows(self, num_of_rows)
def blocks_iter(self):
return TaosBlocks(self)
def fetch_block(self):
if self._result is None:
raise OperationalError("Invalid use of fetch iterator")
block, length = taos_fetch_block_raw(self._result)
if length == 0:
raise StopIteration
precision = self.precision
field_count = self.field_count
fields = self.fields
blocks = [None] * field_count
lengths = self.field_lengths()
for i in range(field_count):
data = ctypes.cast(block, ctypes.POINTER(ctypes.c_void_p))[i]
if fields[i].type not in CONVERT_FUNC_BLOCK:
raise DatabaseError("Invalid data type returned from database")
blocks[i] = CONVERT_FUNC_BLOCK[fields[i].type](data, length, lengths[i], precision)
return list(map(tuple, zip(*blocks))), length
def fetch_all(self):
if self._result is None:
raise OperationalError("Invalid use of fetchall")
if self._fields == None:
self._fields = taos_fetch_fields(self._result)
buffer = [[] for i in range(len(self._fields))]
self._row_count = 0
while True:
block, num_of_fields = taos_fetch_block(self._result, self._fields)
errno = taos_errno(self._result)
if errno != 0:
raise ProgrammingError(taos_errstr(self._result), errno)
if num_of_fields == 0:
break
self._row_count += num_of_fields
for i in range(len(self._fields)):
buffer[i].extend(block[i])
return list(map(tuple, zip(*buffer)))
def fetch_rows_a(self, callback, param):
taos_fetch_rows_a(self._result, callback, param)
def stop_query(self):
return taos_stop_query(self._result)
def errno(self):
"""**DO NOT** use this directly unless you know what you are doing"""
return taos_errno(self._result)
def errstr(self):
return taos_errstr(self._result)
def check_error(self, errno=None, close=True):
if errno == None:
errno = self.errno()
if errno != 0:
msg = self.errstr()
self.close()
raise OperationalError(msg, errno)
def close(self):
"""free result object."""
if self._result != None and self._close_after:
taos_free_result(self._result)
self._result = None
self._fields = None
self._field_count = None
self._field_lengths = None
def __del__(self):
self.close()
class TaosRows:
"""TDengine result rows iterator"""
def __init__(self, result, num_of_rows=None):
self._result = result
self._num_of_rows = num_of_rows
def __iter__(self):
return self
def __next__(self):
return self._next_row()
def next(self):
return self._next_row()
def _next_row(self):
if self._result is None:
raise OperationalError("Invalid use of fetch iterator")
if self._num_of_rows != None and self._num_of_rows <= self._result._row_count:
raise StopIteration
row = taos_fetch_row_raw(self._result._result)
if not row:
raise StopIteration
self._result._row_count += 1
return TaosRow(self._result, row)
@property
def row_count(self):
"""Return the rowcount of the object"""
return self._result._row_count
class TaosRow:
def __init__(self, result, row):
self._result = result
self._row = row
def __str__(self):
return taos_print_row(self._row, self._result.fields, self._result.field_count)
def __call__(self):
return self.as_tuple()
def _astuple(self):
return self.as_tuple()
def __iter__(self):
return self.as_tuple()
def as_ptr(self):
return self._row
def as_tuple(self):
precision = self._result.precision
field_count = self._result.field_count
blocks = [None] * field_count
fields = self._result.fields
field_lens = self._result.field_lengths()
for i in range(field_count):
data = ctypes.cast(self._row, ctypes.POINTER(ctypes.c_void_p))[i]
if fields[i].type not in CONVERT_FUNC:
raise DatabaseError("Invalid data type returned from database")
if data is None:
blocks[i] = None
else:
blocks[i] = CONVERT_FUNC[fields[i].type](data, 1, field_lens[i], precision)[0]
return tuple(blocks)
class TaosBlocks:
"""TDengine result blocks iterator"""
def __init__(self, result):
self._result = result
def __iter__(self):
return self
def __next__(self):
return self._result.fetch_block()
def next(self):
return self._result.fetch_block()
from taos.cinterface import *
from taos.error import *
from taos.result import *
class TaosStmt(object):
"""TDengine STMT interface"""
def __init__(self, stmt, conn = None):
self._conn = conn
self._stmt = stmt
def set_tbname(self, name):
"""Set table name if needed.
Note that the set_tbname* method should only used in insert statement
"""
if self._stmt is None:
raise StatementError("Invalid use of set_tbname")
taos_stmt_set_tbname(self._stmt, name)
def prepare(self, sql):
# type: (str) -> None
taos_stmt_prepare(self._stmt, sql)
def set_tbname_tags(self, name, tags):
# type: (str, Array[TaosBind]) -> None
"""Set table name with tags, tags is array of BindParams"""
if self._stmt is None:
raise StatementError("Invalid use of set_tbname")
taos_stmt_set_tbname_tags(self._stmt, name, tags)
def bind_param(self, params, add_batch=True):
# type: (Array[TaosBind], bool) -> None
if self._stmt is None:
raise StatementError("Invalid use of stmt")
taos_stmt_bind_param(self._stmt, params)
if add_batch:
taos_stmt_add_batch(self._stmt)
def bind_param_batch(self, binds, add_batch=True):
# type: (Array[TaosMultiBind], bool) -> None
if self._stmt is None:
raise StatementError("Invalid use of stmt")
taos_stmt_bind_param_batch(self._stmt, binds)
if add_batch:
taos_stmt_add_batch(self._stmt)
def add_batch(self):
if self._stmt is None:
raise StatementError("Invalid use of stmt")
taos_stmt_add_batch(self._stmt)
def execute(self):
if self._stmt is None:
raise StatementError("Invalid use of execute")
taos_stmt_execute(self._stmt)
def use_result(self):
result = taos_stmt_use_result(self._stmt)
return TaosResult(result)
def close(self):
"""Close stmt."""
if self._stmt is None:
return
taos_stmt_close(self._stmt)
self._stmt = None
def __del__(self):
self.close()
if __name__ == "__main__":
from taos.connection import TaosConnection
conn = TaosConnection()
stmt = conn.statement("select * from log.log limit 10")
stmt.execute()
result = stmt.use_result()
for row in result:
print(row)
stmt.close()
conn.close()
from taos.cinterface import *
from taos.error import *
from taos.result import *
class TaosStream(object):
"""TDengine Stream interface"""
def __init__(self, stream):
self._raw = stream
def as_ptr(self):
return self._raw
def close(self):
"""Close stmt."""
if self._raw is not None:
taos_close_stream(self._raw)
self._raw = None
def __del__(self):
self.close()
from .cinterface import CTaosInterface from taos.result import TaosResult
from .cinterface import *
from .error import * from .error import *
class TDengineSubscription(object): class TaosSubscription(object):
"""TDengine subscription object """TDengine subscription object"""
"""
def __init__(self, sub): def __init__(self, sub, with_callback = False):
self._sub = sub self._sub = sub
self._with_callback = with_callback
def consume(self): def consume(self):
"""Consume rows of a subscription """Consume rows of a subscription"""
"""
if self._sub is None: if self._sub is None:
raise OperationalError("Invalid use of consume") raise OperationalError("Invalid use of consume")
if self._with_callback:
result, fields = CTaosInterface.consume(self._sub) raise OperationalError("DONOT use consume method in an subscription with callback")
buffer = [[] for i in range(len(fields))] result = taos_consume(self._sub)
while True: return TaosResult(result)
block, num_of_fields = CTaosInterface.fetchBlock(result, fields)
if num_of_fields == 0:
break
for i in range(len(fields)):
buffer[i].extend(block[i])
self.fields = fields
return list(map(tuple, zip(*buffer)))
def close(self, keepProgress=True): def close(self, keepProgress=True):
"""Close the Subscription. """Close the Subscription."""
"""
if self._sub is None: if self._sub is None:
return False return False
CTaosInterface.unsubscribe(self._sub, keepProgress) taos_unsubscribe(self._sub, keepProgress)
self._sub = None
return True return True
def __del__(self):
self.close()
if __name__ == "__main__":
from .connection import TaosConnection
if __name__ == '__main__': conn = TaosConnection(host="127.0.0.1", user="root", password="taosdata", database="test")
from .connection import TDengineConnection
conn = TDengineConnection(
host="127.0.0.1",
user="root",
password="taosdata",
database="test")
# Generate a cursor object to run SQL commands # Generate a cursor object to run SQL commands
sub = conn.subscribe(True, "test", "select * from meters;", 1000) sub = conn.subscribe(True, "test", "select * from meters;", 1000)
......
class TimestampType(object):
"""Choose which type that parsing TDengine timestamp data to
- DATETIME: use python datetime.datetime, note that it does not support nanosecond precision,
and python taos will use raw c_int64 as a fallback for nanosecond results.
- NUMPY: use numpy.datetime64 type.
- RAW: use raw c_int64.
- TAOS: use taos' TaosTimestamp.
"""
DATETIME = 0,
NUMPY = 1,
RAW = 2,
TAOS = 3,
class TaosTimestamp:
pass
此差异已折叠。
此差异已折叠。
from taos.cinterface import *
from taos import *
import pytest
@pytest.fixture
def conn():
return connect()
def test_client_info():
print(taos_get_client_info())
None
def test_server_info(conn):
# type: (TaosConnection) -> None
print(conn.client_info)
print(conn.server_info)
None
if __name__ == "__main__":
test_client_info()
test_server_info(connect())
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
...@@ -32,6 +32,7 @@ char * strnchr(char *haystack, char needle, int32_t len, bool skipquote); ...@@ -32,6 +32,7 @@ char * strnchr(char *haystack, char needle, int32_t len, bool skipquote);
char ** strsplit(char *src, const char *delim, int32_t *num); char ** strsplit(char *src, const char *delim, int32_t *num);
char * strtolower(char *dst, const char *src); char * strtolower(char *dst, const char *src);
char * strntolower(char *dst, const char *src, int32_t n); char * strntolower(char *dst, const char *src, int32_t n);
char * strntolower_s(char *dst, const char *src, int32_t n);
int64_t strnatoi(char *num, int32_t len); int64_t strnatoi(char *num, int32_t len);
char * strbetween(char *string, char *begin, char *end); char * strbetween(char *string, char *begin, char *end);
char * paGetToken(char *src, char **token, int32_t *tokenLen); char * paGetToken(char *src, char **token, int32_t *tokenLen);
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册