提交 2e62d8d8 编写于 作者: H Haojun Liao

[td-225]merge develop.

......@@ -25,15 +25,14 @@ steps:
- master
---
kind: pipeline
name: test_arm64
name: test_arm64_bionic
platform:
os: linux
arch: arm64
steps:
- name: build
image: gcc
image: arm64v8/ubuntu:bionic
commands:
- apt-get update
- apt-get install -y cmake build-essential
......@@ -48,9 +47,87 @@ steps:
branch:
- develop
- master
- 2.0
---
kind: pipeline
name: test_arm
name: test_arm64_focal
platform:
os: linux
arch: arm64
steps:
- name: build
image: arm64v8/ubuntu:focal
commands:
- echo 'debconf debconf/frontend select Noninteractive' | debconf-set-selections
- apt-get update
- apt-get install -y -qq cmake build-essential
- mkdir debug
- cd debug
- cmake .. -DCPUTYPE=aarch64 > /dev/null
- make
trigger:
event:
- pull_request
when:
branch:
- develop
- master
- 2.0
---
kind: pipeline
name: test_arm64_centos7
platform:
os: linux
arch: arm64
steps:
- name: build
image: arm64v8/centos:7
commands:
- yum install -y gcc gcc-c++ make cmake git
- mkdir debug
- cd debug
- cmake .. -DCPUTYPE=aarch64 > /dev/null
- make
trigger:
event:
- pull_request
when:
branch:
- develop
- master
- 2.0
---
kind: pipeline
name: test_arm64_centos8
platform:
os: linux
arch: arm64
steps:
- name: build
image: arm64v8/centos:8
commands:
- dnf install -y gcc gcc-c++ make cmake epel-release git libarchive
- mkdir debug
- cd debug
- cmake .. -DCPUTYPE=aarch64 > /dev/null
- make
trigger:
event:
- pull_request
when:
branch:
- develop
- master
- 2.0
---
kind: pipeline
name: test_arm_bionic
platform:
os: linux
......@@ -73,7 +150,6 @@ steps:
branch:
- develop
- master
---
kind: pipeline
name: build_trusty
......@@ -174,25 +250,3 @@ steps:
- develop
- master
---
kind: pipeline
name: goodbye
platform:
os: linux
arch: amd64
steps:
- name: 64-bit
image: alpine
commands:
- echo 64-bit is good.
when:
branch:
- develop
- master
depends_on:
- test_arm64
- test_amd64
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package app
import (
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package expr
import (
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package expr
import "testing"
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package expr
import (
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package expr
import (
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package app
import (
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package app
import (
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package app
import (
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package main
import (
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package models
import (
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package models
import "time"
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package utils
import (
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package log
import (
......
......@@ -4,7 +4,7 @@ PROJECT(TDengine)
IF (DEFINED VERNUMBER)
SET(TD_VER_NUMBER ${VERNUMBER})
ELSE ()
SET(TD_VER_NUMBER "2.1.5.0")
SET(TD_VER_NUMBER "2.1.6.0")
ENDIF ()
IF (DEFINED VERCOMPATIBLE)
......
......@@ -3,17 +3,17 @@
## <a class="anchor" id="grafana"></a>Grafana
TDengine能够与开源数据可视化系统[Grafana](https://www.grafana.com/)快速集成搭建数据监测报警系统,整个过程无需任何代码开发,TDengine中数据表中内容可以在仪表盘(DashBoard)上进行可视化展现。
TDengine 能够与开源数据可视化系统 [Grafana](https://www.grafana.com/)快速集成搭建数据监测报警系统,整个过程无需任何代码开发,TDengine 中数据表中内容可以在仪表盘(DashBoard)上进行可视化展现。
### 安装Grafana
目前TDengine支持Grafana 5.2.4以上的版本。用户可以根据当前的操作系统,到Grafana官网下载安装包,并执行安装。下载地址如下:https://grafana.com/grafana/download。
目前 TDengine 支持 Grafana 6.2 以上的版本。用户可以根据当前的操作系统,到 Grafana 官网下载安装包,并执行安装。下载地址如下:https://grafana.com/grafana/download。
### 配置Grafana
TDengine的Grafana插件在安装包的/usr/local/taos/connector/grafanaplugin目录下。
TDengine 的 Grafana 插件在安装包的 /usr/local/taos/connector/grafanaplugin 目录下。
CentOS 7.2操作系统为例,将grafanaplugin目录拷贝到/var/lib/grafana/plugins目录下,重新启动grafana即可。
CentOS 7.2 操作系统为例,将 grafanaplugin 目录拷贝到 /var/lib/grafana/plugins 目录下,重新启动 grafana 即可。
```bash
sudo cp -rf /usr/local/taos/connector/grafanaplugin /var/lib/grafana/plugins/tdengine
......
......@@ -427,7 +427,7 @@ TDengine启动后,会自动创建一个监测数据库log,并自动将服务
COMPACT VNODES IN (vg_id1, vg_id2, ...)
```
COMPACT 命令对指定的一个或多个 VGroup 启动碎片重整,系统会通过任务队列尽快安排重整操作的具体执行。COMPACT 指令所需的 VGroup id,可以通过 `SHOW VGROUPS;` 指令的输出结果获取;而且在 `SHOW VGROUPS;` 中会有一个 compacting 列,值为 1 时表示对应的 VGroup 正在进行碎片重整,为 0 时则表示并没有处于重整状态
COMPACT 命令对指定的一个或多个 VGroup 启动碎片重整,系统会通过任务队列尽快安排重整操作的具体执行。COMPACT 指令所需的 VGroup id,可以通过 `SHOW VGROUPS;` 指令的输出结果获取;而且在 `SHOW VGROUPS;` 中会有一个 compacting 列,值为 2 时表示对应的 VGroup 处于排队等待进行重整的状态,值为 1 时表示正在进行碎片重整,为 0 时则表示并没有处于重整状态(未要求进行重整或已经完成重整)
需要注意的是,碎片重整操作会大幅消耗磁盘 I/O。因此在重整进行期间,有可能会影响节点的写入和查询性能,甚至在极端情况下导致短时间的阻写。
......
......@@ -182,7 +182,7 @@ TDengine 缺省的时间戳是毫秒精度,但通过在 CREATE DATABASE 时传
- **批量创建数据表**
```mysql
CREATE TABLE [IF NOT EXISTS] tb_name1 USING stb_name TAGS (tag_value1, ...) tb_name2 USING stb_name TAGS (tag_value2, ...) ...;
CREATE TABLE [IF NOT EXISTS] tb_name1 USING stb_name TAGS (tag_value1, ...) [IF NOT EXISTS] tb_name2 USING stb_name TAGS (tag_value2, ...) ...;
```
以更快的速度批量创建大量数据表(服务器端 2.0.14 及以上版本)。
......
......@@ -165,7 +165,7 @@ Note:
- **Create tables in batches**
```mysql
CREATE TABLE [IF NOT EXISTS] tb_name1 USING stb_name TAGS (tag_value1, ...) tb_name2 USING stb_name TAGS (tag_value2, ...) ...;
CREATE TABLE [IF NOT EXISTS] tb_name1 USING stb_name TAGS (tag_value1, ...) [IF NOT EXISTS] tb_name2 USING stb_name TAGS (tag_value2, ...) ...;
```
Create a large number of data tables in batches faster. (Server side 2.0. 14 and above)
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package main
import (
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package dataimport
import (
......
......@@ -144,6 +144,9 @@ keepColumnName 1
# max length of an SQL
# maxSQLLength 65480
# max length of WildCards
# maxWildCardsLength 100
# the maximum number of records allowed for super table time sorting
# maxNumOfOrderedRes 100000
......
......@@ -44,7 +44,8 @@ echo "version=${version}"
#docker manifest rm tdengine/tdengine
#docker manifest rm tdengine/tdengine:${version}
if [ "$verType" == "beta" ]; then
docker manifest rm tdengine/tdengine:latest
docker manifest inspect tdengine/tdengine-beta:latest
docker manifest rm tdengine/tdengine-beta:latest
docker manifest create -a tdengine/tdengine-beta:${version} tdengine/tdengine-amd64-beta:${version} tdengine/tdengine-aarch64-beta:${version} tdengine/tdengine-aarch32-beta:${version}
docker manifest create -a tdengine/tdengine-beta:latest tdengine/tdengine-amd64-beta:latest tdengine/tdengine-aarch64-beta:latest tdengine/tdengine-aarch32-beta:latest
docker login -u tdengine -p ${passWord} #replace the docker registry username and password
......@@ -52,6 +53,7 @@ if [ "$verType" == "beta" ]; then
docker manifest push tdengine/tdengine-beta:${version}
elif [ "$verType" == "stable" ]; then
docker manifest inspect tdengine/tdengine:latest
docker manifest rm tdengine/tdengine:latest
docker manifest create -a tdengine/tdengine:${version} tdengine/tdengine-amd64:${version} tdengine/tdengine-aarch64:${version} tdengine/tdengine-aarch32:${version}
docker manifest create -a tdengine/tdengine:latest tdengine/tdengine-amd64:latest tdengine/tdengine-aarch64:latest tdengine/tdengine-aarch32:latest
......
......@@ -35,7 +35,7 @@ fi
if [ "$pagMode" == "lite" ]; then
strip ${build_dir}/bin/taosd
strip ${build_dir}/bin/taos
bin_files="${build_dir}/bin/taosd ${build_dir}/bin/taos ${script_dir}/remove.sh"
bin_files="${build_dir}/bin/taosd ${build_dir}/bin/taos ${script_dir}/remove.sh ${script_dir}/startPre.sh"
else
bin_files="${build_dir}/bin/taosd ${build_dir}/bin/taos ${build_dir}/bin/taosdump ${build_dir}/bin/taosdemo ${build_dir}/bin/tarbitrator\
${script_dir}/remove.sh ${script_dir}/set_core.sh ${script_dir}/startPre.sh ${script_dir}/taosd-dump-cfg.gdb"
......
name: tdengine
base: core18
version: '2.1.5.0'
version: '2.1.6.0'
icon: snap/gui/t-dengine.svg
summary: an open-source big data platform designed and optimized for IoT.
description: |
......@@ -72,7 +72,7 @@ parts:
- usr/bin/taosd
- usr/bin/taos
- usr/bin/taosdemo
- usr/lib/libtaos.so.2.1.5.0
- usr/lib/libtaos.so.2.1.6.0
- usr/lib/libtaos.so.1
- usr/lib/libtaos.so
......
......@@ -61,6 +61,7 @@ typedef struct SJoinSupporter {
uint64_t uid; // query table uid
SArray* colList; // previous query information, no need to use this attribute, and the corresponding attribution
SArray* exprList;
SArray* colCond;
SFieldInfo fieldsInfo;
STagCond tagCond;
SGroupbyExpr groupInfo; // group by info
......@@ -244,8 +245,9 @@ SCond* tsGetSTableQueryCond(STagCond* pCond, uint64_t uid);
void tsSetSTableQueryCond(STagCond* pTagCond, uint64_t uid, SBufferWriter* bw);
int32_t tscTagCondCopy(STagCond* dest, const STagCond* src);
int32_t tscColCondCopy(SArray** dest, const SArray* src, uint64_t uid, int16_t tidx);
void tscTagCondRelease(STagCond* pCond);
void tscColCondRelease(SArray** pCond);
void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SQueryInfo* pQueryInfo);
bool tscShouldBeFreed(SSqlObj* pSql);
......@@ -355,6 +357,7 @@ char* strdup_throw(const char* str);
bool vgroupInfoIdentical(SNewVgroupInfo *pExisted, SVgroupMsg* src);
SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg);
STblCond* tsGetTableFilter(SArray* filters, uint64_t uid, int16_t idx);
void tscRemoveCachedTableMeta(STableMetaInfo* pTableMetaInfo, uint64_t id);
......
......@@ -339,6 +339,11 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
const char* msg = (sub->cmd.command == TSDB_SQL_STABLEVGROUP)? "vgroup-list":"multi-tableMeta";
if (code != TSDB_CODE_SUCCESS) {
tscError("0x%"PRIx64" get %s failed, code:%s", pSql->self, msg, tstrerror(code));
if (code == TSDB_CODE_RPC_FQDN_ERROR) {
size_t sz = strlen(tscGetErrorMsgPayload(&sub->cmd));
tscAllocPayload(&pSql->cmd, (int)sz + 1);
memcpy(tscGetErrorMsgPayload(&pSql->cmd), tscGetErrorMsgPayload(&sub->cmd), sz);
}
goto _error;
}
......
此差异已折叠。
......@@ -501,6 +501,15 @@ static void doProcessMsgFromServer(SSchedMsg* pSchedMsg) {
pRes->code = rpcMsg->code;
}
rpcMsg->code = (pRes->code == TSDB_CODE_SUCCESS) ? (int32_t)pRes->numOfRows : pRes->code;
if (pRes->code == TSDB_CODE_RPC_FQDN_ERROR) {
if (pEpSet) {
char buf[TSDB_FQDN_LEN + 64] = {0};
tscAllocPayload(pCmd, sizeof(buf));
sprintf(tscGetErrorMsgPayload(pCmd), "%s\"%s\"", tstrerror(pRes->code),pEpSet->fqdn[(pEpSet->inUse)%(pEpSet->numOfEps)]);
} else {
sprintf(tscGetErrorMsgPayload(pCmd), "%s", tstrerror(pRes->code));
}
}
(*pSql->fp)(pSql->param, pSql, rpcMsg->code);
}
......@@ -675,7 +684,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlObj *pSql) {
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
int32_t srcColListSize = (int32_t)(taosArrayGetSize(pQueryInfo->colList) * sizeof(SColumnInfo));
int32_t srcColFilterSize = tscGetColFilterSerializeLen(pQueryInfo);
int32_t srcColFilterSize = 0;
int32_t srcTagFilterSize = tscGetTagFilterSerializeLen(pQueryInfo);
size_t numOfExprs = tscNumOfExprs(pQueryInfo);
......@@ -686,6 +695,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlObj *pSql) {
int32_t tableSerialize = 0;
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
if (pTableMetaInfo->pVgroupTables != NULL) {
size_t numOfGroups = taosArrayGetSize(pTableMetaInfo->pVgroupTables);
......@@ -698,8 +708,15 @@ static int32_t tscEstimateQueryMsgSize(SSqlObj *pSql) {
tableSerialize = totalTables * sizeof(STableIdInfo);
}
return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + srcColFilterSize + srcTagFilterSize +
exprSize + tsBufSize + tableSerialize + sqlLen + 4096 + pQueryInfo->bufLen;
if (pQueryInfo->colCond && taosArrayGetSize(pQueryInfo->colCond) > 0) {
STblCond *pCond = tsGetTableFilter(pQueryInfo->colCond, pTableMeta->id.uid, 0);
if (pCond != NULL && pCond->cond != NULL) {
srcColFilterSize = pCond->len;
}
}
return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + srcColFilterSize + srcTagFilterSize + exprSize + tsBufSize +
tableSerialize + sqlLen + 4096 + pQueryInfo->bufLen;
}
static char *doSerializeTableInfo(SQueryTableMsg *pQueryMsg, SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, char *pMsg,
......@@ -957,10 +974,21 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->tableCols[i].colId = htons(pCol->colId);
pQueryMsg->tableCols[i].bytes = htons(pCol->bytes);
pQueryMsg->tableCols[i].type = htons(pCol->type);
pQueryMsg->tableCols[i].flist.numOfFilters = htons(pCol->flist.numOfFilters);
//pQueryMsg->tableCols[i].flist.numOfFilters = htons(pCol->flist.numOfFilters);
pQueryMsg->tableCols[i].flist.numOfFilters = 0;
// append the filter information after the basic column information
serializeColFilterInfo(pCol->flist.filterInfo, pCol->flist.numOfFilters, &pMsg);
//serializeColFilterInfo(pCol->flist.filterInfo, pCol->flist.numOfFilters, &pMsg);
}
if (pQueryInfo->colCond && taosArrayGetSize(pQueryInfo->colCond) > 0 && !onlyQueryTags(&query) ) {
STblCond *pCond = tsGetTableFilter(pQueryInfo->colCond, pTableMeta->id.uid, 0);
if (pCond != NULL && pCond->cond != NULL) {
pQueryMsg->colCondLen = htons(pCond->len);
memcpy(pMsg, pCond->cond, pCond->len);
pMsg += pCond->len;
}
}
for (int32_t i = 0; i < query.numOfOutput; ++i) {
......@@ -1035,7 +1063,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SCond *pCond = tsGetSTableQueryCond(pTagCond, pTableMeta->id.uid);
if (pCond != NULL && pCond->cond != NULL) {
pQueryMsg->tagCondLen = htonl(pCond->len);
pQueryMsg->tagCondLen = htons(pCond->len);
memcpy(pMsg, pCond->cond, pCond->len);
pMsg += pCond->len;
......
......@@ -196,6 +196,11 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
if (pSql->res.code != TSDB_CODE_SUCCESS) {
terrno = pSql->res.code;
if (terrno ==TSDB_CODE_RPC_FQDN_ERROR) {
printf("taos connect failed, reason: %s\n\n", taos_errstr(pSql));
} else {
printf("taos connect failed, reason: %s.\n\n", tstrerror(terrno));
}
taos_free_result(pSql);
taos_close(pObj);
return NULL;
......@@ -643,7 +648,7 @@ char *taos_errstr(TAOS_RES *tres) {
return (char*) tstrerror(terrno);
}
if (hasAdditionalErrorInfo(pSql->res.code, &pSql->cmd)) {
if (hasAdditionalErrorInfo(pSql->res.code, &pSql->cmd) || pSql->res.code == TSDB_CODE_RPC_FQDN_ERROR) {
return pSql->cmd.payload;
} else {
return (char*)tstrerror(pSql->res.code);
......
......@@ -796,6 +796,7 @@ static void issueTsCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj*
STimeWindow window = pQueryInfo->window;
tscInitQueryInfo(pQueryInfo);
pQueryInfo->colCond = pSupporter->colCond;
pQueryInfo->window = window;
TSDB_QUERY_CLEAR_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY);
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_MULTITABLE_QUERY);
......@@ -1883,6 +1884,9 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { // return the tableId & tag
SColumnIndex colIndex = {0};
pSupporter->colCond = pNewQueryInfo->colCond;
pNewQueryInfo->colCond = NULL;
STagCond* pTagCond = &pSupporter->tagCond;
assert(pTagCond->joinInfo.hasJoin);
......@@ -2319,6 +2323,11 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
goto _error;
}
if (tscColCondCopy(&pNewQueryInfo->colCond, pQueryInfo->colCond, pTableMetaInfo->pTableMeta->id.uid, 0) != 0) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error;
}
pNewQueryInfo->window = pQueryInfo->window;
pNewQueryInfo->interval = pQueryInfo->interval;
pNewQueryInfo->sessionWindow = pQueryInfo->sessionWindow;
......
......@@ -62,11 +62,11 @@ int32_t converToStr(char *str, int type, void *buf, int32_t bufSize, int32_t *le
break;
case TSDB_DATA_TYPE_FLOAT:
n = sprintf(str, "%f", GET_FLOAT_VAL(buf));
n = sprintf(str, "%e", GET_FLOAT_VAL(buf));
break;
case TSDB_DATA_TYPE_DOUBLE:
n = sprintf(str, "%f", GET_DOUBLE_VAL(buf));
n = sprintf(str, "%e", GET_DOUBLE_VAL(buf));
break;
case TSDB_DATA_TYPE_BINARY:
......@@ -82,6 +82,22 @@ int32_t converToStr(char *str, int type, void *buf, int32_t bufSize, int32_t *le
n = bufSize + 2;
break;
case TSDB_DATA_TYPE_UTINYINT:
n = sprintf(str, "%d", *(uint8_t*)buf);
break;
case TSDB_DATA_TYPE_USMALLINT:
n = sprintf(str, "%d", *(uint16_t*)buf);
break;
case TSDB_DATA_TYPE_UINT:
n = sprintf(str, "%u", *(uint32_t*)buf);
break;
case TSDB_DATA_TYPE_UBIGINT:
n = sprintf(str, "%" PRIu64, *(uint64_t*)buf);
break;
default:
tscError("unsupported type:%d", type);
return TSDB_CODE_TSC_INVALID_VALUE;
......@@ -118,6 +134,24 @@ SCond* tsGetSTableQueryCond(STagCond* pTagCond, uint64_t uid) {
return NULL;
}
STblCond* tsGetTableFilter(SArray* filters, uint64_t uid, int16_t idx) {
if (filters == NULL) {
return NULL;
}
size_t size = taosArrayGetSize(filters);
for (int32_t i = 0; i < size; ++i) {
STblCond* cond = taosArrayGet(filters, i);
if (uid == cond->uid && (idx >= 0 && cond->idx == idx)) {
return cond;
}
}
return NULL;
}
void tsSetSTableQueryCond(STagCond* pTagCond, uint64_t uid, SBufferWriter* bw) {
if (tbufTell(bw) == 0) {
return;
......@@ -753,8 +787,7 @@ typedef struct SDummyInputInfo {
SSDataBlock *block;
STableQueryInfo *pTableQueryInfo;
SSqlObj *pSql; // refactor: remove it
int32_t numOfFilterCols;
SSingleColumnFilterInfo *pFilterInfo;
SFilterInfo *pFilterInfo;
} SDummyInputInfo;
typedef struct SJoinStatus {
......@@ -770,38 +803,7 @@ typedef struct SJoinOperatorInfo {
SRspResultInfo resultInfo; // todo refactor, add this info for each operator
} SJoinOperatorInfo;
static void converNcharFilterColumn(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, int32_t rows, bool *gotNchar) {
for (int32_t i = 0; i < numOfFilterCols; ++i) {
if (pFilterInfo[i].info.type == TSDB_DATA_TYPE_NCHAR) {
pFilterInfo[i].pData2 = pFilterInfo[i].pData;
pFilterInfo[i].pData = malloc(rows * pFilterInfo[i].info.bytes);
int32_t bufSize = pFilterInfo[i].info.bytes - VARSTR_HEADER_SIZE;
for (int32_t j = 0; j < rows; ++j) {
char* dst = (char *)pFilterInfo[i].pData + j * pFilterInfo[i].info.bytes;
char* src = (char *)pFilterInfo[i].pData2 + j * pFilterInfo[i].info.bytes;
int32_t len = 0;
taosMbsToUcs4(varDataVal(src), varDataLen(src), varDataVal(dst), bufSize, &len);
varDataLen(dst) = len;
}
*gotNchar = true;
}
}
}
static void freeNcharFilterColumn(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols) {
for (int32_t i = 0; i < numOfFilterCols; ++i) {
if (pFilterInfo[i].info.type == TSDB_DATA_TYPE_NCHAR) {
if (pFilterInfo[i].pData2) {
tfree(pFilterInfo[i].pData);
pFilterInfo[i].pData = pFilterInfo[i].pData2;
pFilterInfo[i].pData2 = NULL;
}
}
}
}
static void doSetupSDataBlock(SSqlRes* pRes, SSDataBlock* pBlock, SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols) {
static void doSetupSDataBlock(SSqlRes* pRes, SSDataBlock* pBlock, SFilterInfo* pFilterInfo) {
int32_t offset = 0;
char* pData = pRes->data;
......@@ -817,14 +819,16 @@ static void doSetupSDataBlock(SSqlRes* pRes, SSDataBlock* pBlock, SSingleColumnF
}
// filter data if needed
if (numOfFilterCols > 0) {
doSetFilterColumnInfo(pFilterInfo, numOfFilterCols, pBlock);
if (pFilterInfo) {
//doSetFilterColumnInfo(pFilterInfo, numOfFilterCols, pBlock);
doSetFilterColInfo(pFilterInfo, pBlock);
bool gotNchar = false;
converNcharFilterColumn(pFilterInfo, numOfFilterCols, pBlock->info.rows, &gotNchar);
filterConverNcharColumns(pFilterInfo, pBlock->info.rows, &gotNchar);
int8_t* p = calloc(pBlock->info.rows, sizeof(int8_t));
bool all = doFilterDataBlock(pFilterInfo, numOfFilterCols, pBlock->info.rows, p);
//bool all = doFilterDataBlock(pFilterInfo, numOfFilterCols, pBlock->info.rows, p);
bool all = filterExecute(pFilterInfo, pBlock->info.rows, p);
if (gotNchar) {
freeNcharFilterColumn(pFilterInfo, numOfFilterCols);
filterFreeNcharColumns(pFilterInfo);
}
if (!all) {
doCompactSDataBlock(pBlock, pBlock->info.rows, p);
......@@ -862,7 +866,7 @@ SSDataBlock* doGetDataBlock(void* param, bool* newgroup) {
pBlock->info.rows = pRes->numOfRows;
if (pRes->numOfRows != 0) {
doSetupSDataBlock(pRes, pBlock, pInput->pFilterInfo, pInput->numOfFilterCols);
doSetupSDataBlock(pRes, pBlock, pInput->pFilterInfo);
*newgroup = false;
return pBlock;
}
......@@ -877,7 +881,7 @@ SSDataBlock* doGetDataBlock(void* param, bool* newgroup) {
}
pBlock->info.rows = pRes->numOfRows;
doSetupSDataBlock(pRes, pBlock, pInput->pFilterInfo, pInput->numOfFilterCols);
doSetupSDataBlock(pRes, pBlock, pInput->pFilterInfo);
*newgroup = false;
return pBlock;
}
......@@ -920,25 +924,40 @@ SSDataBlock* doDataBlockJoin(void* param, bool* newgroup) {
if (pOperator->status == OP_EXEC_DONE) {
return pJoinInfo->pRes;
}
SJoinStatus* st0 = &pJoinInfo->status[0];
SColumnInfoData* p0 = taosArrayGet(st0->pBlock->pDataBlock, 0);
int64_t* ts0 = (int64_t*) p0->pData;
if (st0->index >= st0->pBlock->info.rows) {
continue;
}
bool prefixEqual = true;
while(1) {
prefixEqual = true;
for (int32_t i = 1; i < pJoinInfo->numOfUpstream; ++i) {
SJoinStatus* st = &pJoinInfo->status[i];
ts0 = (int64_t*) p0->pData;
SColumnInfoData* p = taosArrayGet(st->pBlock->pDataBlock, 0);
int64_t* ts = (int64_t*)p->pData;
if (st->index >= st->pBlock->info.rows || st0->index >= st0->pBlock->info.rows) {
fetchNextBlockIfCompleted(pOperator, newgroup);
if (pOperator->status == OP_EXEC_DONE) {
return pJoinInfo->pRes;
}
prefixEqual = false;
break;
}
if (ts[st->index] < ts0[st0->index]) { // less than the first
prefixEqual = false;
if ((++(st->index)) >= st->pBlock->info.rows) {
if ((++(st->index)) >= st->pBlock->info.rows) {
fetchNextBlockIfCompleted(pOperator, newgroup);
if (pOperator->status == OP_EXEC_DONE) {
return pJoinInfo->pRes;
......@@ -1053,22 +1072,21 @@ static void destroyDummyInputOperator(void* param, int32_t numOfOutput) {
pInfo->block = destroyOutputBuf(pInfo->block);
pInfo->pSql = NULL;
doDestroyFilterInfo(pInfo->pFilterInfo, pInfo->numOfFilterCols);
filterFreeInfo(pInfo->pFilterInfo);
cleanupResultRowInfo(&pInfo->pTableQueryInfo->resInfo);
tfree(pInfo->pTableQueryInfo);
}
// todo this operator servers as the adapter for Operator tree and SqlRes result, remove it later
SOperatorInfo* createDummyInputOperator(SSqlObj* pSql, SSchema* pSchema, int32_t numOfCols, SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols) {
SOperatorInfo* createDummyInputOperator(SSqlObj* pSql, SSchema* pSchema, int32_t numOfCols, SFilterInfo* pFilters) {
assert(numOfCols > 0);
STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX};
SDummyInputInfo* pInfo = calloc(1, sizeof(SDummyInputInfo));
pInfo->pSql = pSql;
pInfo->pFilterInfo = pFilterInfo;
pInfo->numOfFilterCols = numOfFilterCols;
pInfo->pFilterInfo = pFilters;
pInfo->pTableQueryInfo = createTmpTableQueryInfo(win);
pInfo->block = calloc(numOfCols, sizeof(SSDataBlock));
......@@ -1156,6 +1174,7 @@ void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo, uint64_t objId, b
pRes->completed = (pRes->numOfRows == 0);
}
/*
static void createInputDataFilterInfo(SQueryInfo* px, int32_t numOfCol1, int32_t* numOfFilterCols, SSingleColumnFilterInfo** pFilterInfo) {
SColumnInfo* tableCols = calloc(numOfCol1, sizeof(SColumnInfo));
for(int32_t i = 0; i < numOfCol1; ++i) {
......@@ -1173,6 +1192,7 @@ static void createInputDataFilterInfo(SQueryInfo* px, int32_t numOfCol1, int32_t
tfree(tableCols);
}
*/
void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQueryInfo* px, SSqlObj* pSql) {
SSqlRes* pOutput = &pSql->res;
......@@ -1201,11 +1221,17 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue
// if it is a join query, create join operator here
int32_t numOfCol1 = pTableMeta->tableInfo.numOfColumns;
int32_t numOfFilterCols = 0;
SSingleColumnFilterInfo* pFilterInfo = NULL;
createInputDataFilterInfo(px, numOfCol1, &numOfFilterCols, &pFilterInfo);
SFilterInfo *pFilters = NULL;
STblCond *pCond = NULL;
if (px->colCond) {
pCond = tsGetTableFilter(px->colCond, pTableMeta->id.uid, 0);
if (pCond && pCond->cond) {
createQueryFilter(pCond->cond, pCond->len, &pFilters);
}
//createInputDataFlterInfo(px, numOfCol1, &numOfFilterCols, &pFilterInfo);
}
SOperatorInfo* pSourceOperator = createDummyInputOperator(pSqlObjList[0], pSchema, numOfCol1, pFilterInfo, numOfFilterCols);
SOperatorInfo* pSourceOperator = createDummyInputOperator(pSqlObjList[0], pSchema, numOfCol1, pFilters);
pOutput->precision = pSqlObjList[0]->res.precision;
......@@ -1222,15 +1248,21 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue
for(int32_t i = 1; i < px->numOfTables; ++i) {
STableMeta* pTableMeta1 = tscGetMetaInfo(px, i)->pTableMeta;
numOfCol1 = pTableMeta1->tableInfo.numOfColumns;
SFilterInfo *pFilters1 = NULL;
SSchema* pSchema1 = tscGetTableSchema(pTableMeta1);
int32_t n = pTableMeta1->tableInfo.numOfColumns;
int32_t numOfFilterCols1 = 0;
SSingleColumnFilterInfo* pFilterInfo1 = NULL;
createInputDataFilterInfo(px, numOfCol1, &numOfFilterCols1, &pFilterInfo1);
if (px->colCond) {
pCond = tsGetTableFilter(px->colCond, pTableMeta1->id.uid, i);
if (pCond && pCond->cond) {
createQueryFilter(pCond->cond, pCond->len, &pFilters1);
}
//createInputDataFilterInfo(px, numOfCol1, &numOfFilterCols1, &pFilterInfo1);
}
p[i] = createDummyInputOperator(pSqlObjList[i], pSchema1, n, pFilterInfo1, numOfFilterCols1);
p[i] = createDummyInputOperator(pSqlObjList[i], pSchema1, n, pFilters1);
memcpy(&schema[offset], pSchema1, n * sizeof(SSchema));
offset += n;
}
......@@ -2258,6 +2290,11 @@ int32_t tscGetResRowLength(SArray* pExprList) {
}
static void destroyFilterInfo(SColumnFilterList* pFilterList) {
if (pFilterList->filterInfo == NULL) {
pFilterList->numOfFilters = 0;
return;
}
for(int32_t i = 0; i < pFilterList->numOfFilters; ++i) {
if (pFilterList->filterInfo[i].filterstr) {
tfree(pFilterList->filterInfo[i].pz);
......@@ -2960,6 +2997,64 @@ int32_t tscTagCondCopy(STagCond* dest, const STagCond* src) {
return 0;
}
int32_t tscColCondCopy(SArray** dest, const SArray* src, uint64_t uid, int16_t tidx) {
if (src == NULL) {
return 0;
}
size_t s = taosArrayGetSize(src);
*dest = taosArrayInit(s, sizeof(SCond));
for (int32_t i = 0; i < s; ++i) {
STblCond* pCond = taosArrayGet(src, i);
STblCond c = {0};
if (tidx > 0) {
if (!(pCond->uid == uid && pCond->idx == tidx)) {
continue;
}
c.idx = 0;
} else {
c.idx = pCond->idx;
}
c.len = pCond->len;
c.uid = pCond->uid;
if (pCond->len > 0) {
assert(pCond->cond != NULL);
c.cond = malloc(c.len);
if (c.cond == NULL) {
return -1;
}
memcpy(c.cond, pCond->cond, c.len);
}
taosArrayPush(*dest, &c);
}
return 0;
}
void tscColCondRelease(SArray** pCond) {
if (*pCond == NULL) {
return;
}
size_t s = taosArrayGetSize(*pCond);
for (int32_t i = 0; i < s; ++i) {
STblCond* p = taosArrayGet(*pCond, i);
tfree(p->cond);
}
taosArrayDestroy(*pCond);
*pCond = NULL;
}
void tscTagCondRelease(STagCond* pTagCond) {
free(pTagCond->tbnameCond.cond);
......@@ -3152,6 +3247,7 @@ int32_t tscAddQueryInfo(SSqlCmd* pCmd) {
static void freeQueryInfoImpl(SQueryInfo* pQueryInfo) {
tscTagCondRelease(&pQueryInfo->tagCond);
tscColCondRelease(&pQueryInfo->colCond);
tscFieldInfoClear(&pQueryInfo->fieldsInfo);
tscExprDestroy(pQueryInfo->exprList);
......@@ -3242,6 +3338,11 @@ int32_t tscQueryInfoCopy(SQueryInfo* pQueryInfo, const SQueryInfo* pSrc) {
goto _error;
}
if (tscColCondCopy(&pQueryInfo->colCond, pSrc->colCond, 0, -1) != 0) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error;
}
if (pSrc->fillType != TSDB_FILL_NONE) {
pQueryInfo->fillVal = calloc(1, pSrc->fieldsInfo.numOfOutput * sizeof(int64_t));
if (pQueryInfo->fillVal == NULL) {
......@@ -3634,6 +3735,11 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
goto _error;
}
if (tscColCondCopy(&pNewQueryInfo->colCond, pQueryInfo->colCond, pTableMetaInfo->pTableMeta->id.uid, tableIndex) != 0) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error;
}
if (pQueryInfo->fillType != TSDB_FILL_NONE) {
//just make memory memory sanitizer happy
//refactor later
......
......@@ -328,11 +328,10 @@ static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; }
int tdAllocMemForCol(SDataCol *pCol, int maxPoints);
void dataColInit(SDataCol *pDataCol, STColumn *pCol, int maxPoints);
void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints);
int dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints);
void dataColSetOffset(SDataCol *pCol, int nEle);
bool isNEleNull(SDataCol *pCol, int nEle);
void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints);
// Get the data pointer from a column-wised data
static FORCE_INLINE const void *tdGetColDataOfRow(SDataCol *pCol, int row) {
......@@ -357,13 +356,11 @@ static FORCE_INLINE int32_t dataColGetNEleLen(SDataCol *pDataCol, int rows) {
}
typedef struct {
int maxRowSize;
int maxCols; // max number of columns
int maxPoints; // max number of points
int numOfRows;
int numOfCols; // Total number of cols
int sversion; // TODO: set sversion
int maxCols; // max number of columns
int maxPoints; // max number of points
int numOfRows;
int numOfCols; // Total number of cols
int sversion; // TODO: set sversion
SDataCol *cols;
} SDataCols;
......@@ -407,7 +404,7 @@ static FORCE_INLINE TSKEY dataColsKeyLast(SDataCols *pCols) {
}
}
SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows);
SDataCols *tdNewDataCols(int maxCols, int maxRows);
void tdResetDataCols(SDataCols *pCols);
int tdInitDataCols(SDataCols *pCols, STSchema *pSchema);
SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData);
......
......@@ -70,6 +70,7 @@ extern int8_t tsKeepOriginalColumnName;
// client
extern int32_t tsMaxSQLStringLen;
extern int32_t tsMaxWildCardsLen;
extern int8_t tsTscEnableRecordSql;
extern int32_t tsMaxNumOfOrderedResults;
extern int32_t tsMinSlidingTime;
......
......@@ -53,6 +53,8 @@ int32_t tVariantToString(tVariant *pVar, char *dst);
int32_t tVariantDump(tVariant *pVariant, char *payload, int16_t type, bool includeLengthPrefix);
int32_t tVariantDumpEx(tVariant *pVariant, char *payload, int16_t type, bool includeLengthPrefix, bool *converted, char *extInfo);
int32_t tVariantTypeSetType(tVariant *pVariant, char type);
#ifdef __cplusplus
......
......@@ -19,10 +19,10 @@
#include "wchar.h"
#include "tarray.h"
static void dataColSetNEleNull(SDataCol *pCol, int nEle);
static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2,
int limit2, int tRows, bool forceSetNull);
//TODO: change caller to use return val
int tdAllocMemForCol(SDataCol *pCol, int maxPoints) {
int spaceNeeded = pCol->bytes * maxPoints;
if(IS_VAR_DATA_TYPE(pCol->type)) {
......@@ -31,7 +31,7 @@ int tdAllocMemForCol(SDataCol *pCol, int maxPoints) {
if(pCol->spaceSize < spaceNeeded) {
void* ptr = realloc(pCol->pData, spaceNeeded);
if(ptr == NULL) {
uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)pCol->spaceSize,
uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)spaceNeeded,
strerror(errno));
return -1;
} else {
......@@ -239,20 +239,19 @@ void dataColInit(SDataCol *pDataCol, STColumn *pCol, int maxPoints) {
pDataCol->len = 0;
}
// value from timestamp should be TKEY here instead of TSKEY
void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints) {
int dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints) {
ASSERT(pCol != NULL && value != NULL);
if (isAllRowsNull(pCol)) {
if (isNull(value, pCol->type)) {
// all null value yet, just return
return;
return 0;
}
if(tdAllocMemForCol(pCol, maxPoints) < 0) return -1;
if (numOfRows > 0) {
// Find the first not null value, fill all previouse values as NULL
dataColSetNEleNull(pCol, numOfRows, maxPoints);
} else {
tdAllocMemForCol(pCol, maxPoints);
dataColSetNEleNull(pCol, numOfRows);
}
}
......@@ -268,12 +267,21 @@ void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxP
memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, pCol->bytes);
pCol->len += pCol->bytes;
}
return 0;
}
static FORCE_INLINE const void *tdGetColDataOfRowUnsafe(SDataCol *pCol, int row) {
if (IS_VAR_DATA_TYPE(pCol->type)) {
return POINTER_SHIFT(pCol->pData, pCol->dataOff[row]);
} else {
return POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * row);
}
}
bool isNEleNull(SDataCol *pCol, int nEle) {
if(isAllRowsNull(pCol)) return true;
for (int i = 0; i < nEle; i++) {
if (!isNull(tdGetColDataOfRow(pCol, i), pCol->type)) return false;
if (!isNull(tdGetColDataOfRowUnsafe(pCol, i), pCol->type)) return false;
}
return true;
}
......@@ -290,9 +298,7 @@ static FORCE_INLINE void dataColSetNullAt(SDataCol *pCol, int index) {
}
}
void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints) {
tdAllocMemForCol(pCol, maxPoints);
static void dataColSetNEleNull(SDataCol *pCol, int nEle) {
if (IS_VAR_DATA_TYPE(pCol->type)) {
pCol->len = 0;
for (int i = 0; i < nEle; i++) {
......@@ -318,7 +324,7 @@ void dataColSetOffset(SDataCol *pCol, int nEle) {
}
}
SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) {
SDataCols *tdNewDataCols(int maxCols, int maxRows) {
SDataCols *pCols = (SDataCols *)calloc(1, sizeof(SDataCols));
if (pCols == NULL) {
uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)sizeof(SDataCols), strerror(errno));
......@@ -326,6 +332,9 @@ SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) {
}
pCols->maxPoints = maxRows;
pCols->maxCols = maxCols;
pCols->numOfRows = 0;
pCols->numOfCols = 0;
if (maxCols > 0) {
pCols->cols = (SDataCol *)calloc(maxCols, sizeof(SDataCol));
......@@ -342,13 +351,8 @@ SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) {
pCols->cols[i].pData = NULL;
pCols->cols[i].dataOff = NULL;
}
pCols->maxCols = maxCols;
}
pCols->maxRowSize = maxRowSize;
return pCols;
}
......@@ -357,8 +361,9 @@ int tdInitDataCols(SDataCols *pCols, STSchema *pSchema) {
int oldMaxCols = pCols->maxCols;
if (schemaNCols(pSchema) > oldMaxCols) {
pCols->maxCols = schemaNCols(pSchema);
pCols->cols = (SDataCol *)realloc(pCols->cols, sizeof(SDataCol) * pCols->maxCols);
if (pCols->cols == NULL) return -1;
void* ptr = (SDataCol *)realloc(pCols->cols, sizeof(SDataCol) * pCols->maxCols);
if (ptr == NULL) return -1;
pCols->cols = ptr;
for(i = oldMaxCols; i < pCols->maxCols; i++) {
pCols->cols[i].pData = NULL;
pCols->cols[i].dataOff = NULL;
......@@ -366,10 +371,6 @@ int tdInitDataCols(SDataCols *pCols, STSchema *pSchema) {
}
}
if (schemaTLen(pSchema) > pCols->maxRowSize) {
pCols->maxRowSize = schemaTLen(pSchema);
}
tdResetDataCols(pCols);
pCols->numOfCols = schemaNCols(pSchema);
......@@ -398,7 +399,7 @@ SDataCols *tdFreeDataCols(SDataCols *pCols) {
}
SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
SDataCols *pRet = tdNewDataCols(pDataCols->maxRowSize, pDataCols->maxCols, pDataCols->maxPoints);
SDataCols *pRet = tdNewDataCols(pDataCols->maxCols, pDataCols->maxPoints);
if (pRet == NULL) return NULL;
pRet->numOfCols = pDataCols->numOfCols;
......@@ -413,7 +414,10 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
if (keepData) {
if (pDataCols->cols[i].len > 0) {
tdAllocMemForCol(&pRet->cols[i], pRet->maxPoints);
if(tdAllocMemForCol(&pRet->cols[i], pRet->maxPoints) < 0) {
tdFreeDataCols(pRet);
return NULL;
}
pRet->cols[i].len = pDataCols->cols[i].len;
memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pDataCols->cols[i].len);
if (IS_VAR_DATA_TYPE(pRet->cols[i].type)) {
......@@ -584,9 +588,12 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i
if ((key1 > key2) || (key1 == key2 && !TKEY_IS_DELETED(tkey2))) {
for (int i = 0; i < src2->numOfCols; i++) {
ASSERT(target->cols[i].type == src2->cols[i].type);
if (src2->cols[i].len > 0 && (forceSetNull || (!forceSetNull && !isNull(src2->cols[i].pData, src2->cols[i].type)))) {
if (src2->cols[i].len > 0 && !isNull(src2->cols[i].pData, src2->cols[i].type)) {
dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src2->cols + i, *iter2), target->numOfRows,
target->maxPoints);
} else if(!forceSetNull && key1 == key2 && src1->cols[i].len > 0) {
dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows,
target->maxPoints);
}
}
target->numOfRows++;
......
......@@ -118,7 +118,7 @@ void tExprTreeDestroy(tExprNode *pNode, void (*fp)(void *)) {
} else if (pNode->nodeType == TSQL_NODE_VALUE) {
tVariantDestroy(pNode->pVal);
} else if (pNode->nodeType == TSQL_NODE_COL) {
free(pNode->pSchema);
tfree(pNode->pSchema);
}
free(pNode);
......@@ -435,7 +435,7 @@ tExprNode* exprTreeFromTableName(const char* tbnameCond) {
expr->_node.optr = TSDB_RELATION_IN;
tVariant* pVal = exception_calloc(1, sizeof(tVariant));
right->pVal = pVal;
pVal->nType = TSDB_DATA_TYPE_ARRAY;
pVal->nType = TSDB_DATA_TYPE_POINTER_ARRAY;
pVal->arr = taosArrayInit(2, POINTER_BYTES);
const char* cond = tbnameCond + QUERY_COND_REL_PREFIX_IN_LEN;
......@@ -502,6 +502,183 @@ void buildFilterSetFromBinary(void **q, const char *buf, int32_t len) {
*q = (void *)pObj;
}
void convertFilterSetFromBinary(void **q, const char *buf, int32_t len, uint32_t tType) {
SBufferReader br = tbufInitReader(buf, len, false);
uint32_t sType = tbufReadUint32(&br);
SHashObj *pObj = taosHashInit(256, taosGetDefaultHashFunction(tType), true, false);
taosHashSetEqualFp(pObj, taosGetDefaultEqualFunction(tType));
int dummy = -1;
tVariant tmpVar = {0};
size_t t = 0;
int32_t sz = tbufReadInt32(&br);
void *pvar = NULL;
int64_t val = 0;
int32_t bufLen = 0;
if (IS_NUMERIC_TYPE(sType)) {
bufLen = 60; // The maximum length of string that a number is converted to.
} else {
bufLen = 128;
}
char *tmp = calloc(1, bufLen * TSDB_NCHAR_SIZE);
for (int32_t i = 0; i < sz; i++) {
switch (sType) {
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_UTINYINT:
case TSDB_DATA_TYPE_TINYINT: {
*(uint8_t *)&val = (uint8_t)tbufReadInt64(&br);
t = sizeof(val);
pvar = &val;
break;
}
case TSDB_DATA_TYPE_USMALLINT:
case TSDB_DATA_TYPE_SMALLINT: {
*(uint16_t *)&val = (uint16_t)tbufReadInt64(&br);
t = sizeof(val);
pvar = &val;
break;
}
case TSDB_DATA_TYPE_UINT:
case TSDB_DATA_TYPE_INT: {
*(uint32_t *)&val = (uint32_t)tbufReadInt64(&br);
t = sizeof(val);
pvar = &val;
break;
}
case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_UBIGINT:
case TSDB_DATA_TYPE_BIGINT: {
*(uint64_t *)&val = (uint64_t)tbufReadInt64(&br);
t = sizeof(val);
pvar = &val;
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
*(double *)&val = tbufReadDouble(&br);
t = sizeof(val);
pvar = &val;
break;
}
case TSDB_DATA_TYPE_FLOAT: {
*(float *)&val = (float)tbufReadDouble(&br);
t = sizeof(val);
pvar = &val;
break;
}
case TSDB_DATA_TYPE_BINARY: {
pvar = (char *)tbufReadBinary(&br, &t);
break;
}
case TSDB_DATA_TYPE_NCHAR: {
pvar = (char *)tbufReadBinary(&br, &t);
break;
}
default:
taosHashCleanup(pObj);
*q = NULL;
return;
}
tVariantCreateFromBinary(&tmpVar, (char *)pvar, t, sType);
if (bufLen < t) {
tmp = realloc(tmp, t * TSDB_NCHAR_SIZE);
bufLen = (int32_t)t;
}
switch (tType) {
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_UTINYINT:
case TSDB_DATA_TYPE_TINYINT: {
if (tVariantDump(&tmpVar, (char *)&val, tType, false)) {
goto err_ret;
}
pvar = &val;
t = sizeof(val);
break;
}
case TSDB_DATA_TYPE_USMALLINT:
case TSDB_DATA_TYPE_SMALLINT: {
if (tVariantDump(&tmpVar, (char *)&val, tType, false)) {
goto err_ret;
}
pvar = &val;
t = sizeof(val);
break;
}
case TSDB_DATA_TYPE_UINT:
case TSDB_DATA_TYPE_INT: {
if (tVariantDump(&tmpVar, (char *)&val, tType, false)) {
goto err_ret;
}
pvar = &val;
t = sizeof(val);
break;
}
case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_UBIGINT:
case TSDB_DATA_TYPE_BIGINT: {
if (tVariantDump(&tmpVar, (char *)&val, tType, false)) {
goto err_ret;
}
pvar = &val;
t = sizeof(val);
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
if (tVariantDump(&tmpVar, (char *)&val, tType, false)) {
goto err_ret;
}
pvar = &val;
t = sizeof(val);
break;
}
case TSDB_DATA_TYPE_FLOAT: {
if (tVariantDump(&tmpVar, (char *)&val, tType, false)) {
goto err_ret;
}
pvar = &val;
t = sizeof(val);
break;
}
case TSDB_DATA_TYPE_BINARY: {
if (tVariantDump(&tmpVar, tmp, tType, true)) {
goto err_ret;
}
t = varDataLen(tmp);
pvar = varDataVal(tmp);
break;
}
case TSDB_DATA_TYPE_NCHAR: {
if (tVariantDump(&tmpVar, tmp, tType, true)) {
goto err_ret;
}
t = varDataLen(tmp);
pvar = varDataVal(tmp);
break;
}
default:
goto err_ret;
}
taosHashPut(pObj, (char *)pvar, t, &dummy, sizeof(dummy));
tVariantDestroy(&tmpVar);
memset(&tmpVar, 0, sizeof(tmpVar));
}
*q = (void *)pObj;
pObj = NULL;
err_ret:
tVariantDestroy(&tmpVar);
taosHashCleanup(pObj);
tfree(tmp);
}
tExprNode* exprdup(tExprNode* pNode) {
if (pNode == NULL) {
return NULL;
......
......@@ -25,6 +25,7 @@
#include "tutil.h"
#include "tlocale.h"
#include "ttimezone.h"
#include "tcompare.h"
// cluster
char tsFirst[TSDB_EP_LEN] = {0};
......@@ -75,6 +76,7 @@ int32_t tsCompressMsgSize = -1;
// client
int32_t tsMaxSQLStringLen = TSDB_MAX_ALLOWED_SQL_LEN;
int32_t tsMaxWildCardsLen = TSDB_PATTERN_STRING_MAX_LEN;
int8_t tsTscEnableRecordSql = 0;
// the maximum number of results for projection query on super table that are returned from
......@@ -984,6 +986,16 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_BYTE;
taosInitConfigOption(cfg);
cfg.option = "maxWildCardsLength";
cfg.ptr = &tsMaxWildCardsLen;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 0;
cfg.maxValue = TSDB_MAX_ALLOWED_SQL_LEN;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_BYTE;
taosInitConfigOption(cfg);
cfg.option = "maxNumOfOrderedRes";
cfg.ptr = &tsMaxNumOfOrderedResults;
cfg.valType = TAOS_CFG_VTYPE_INT32;
......@@ -1531,6 +1543,7 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
assert(tsGlobalConfigNum <= TSDB_CFG_MAX_NUM);
#ifdef TD_TSZ
// lossy compress
cfg.option = "lossyColumns";
......
......@@ -61,7 +61,7 @@ bool tscValidateTableNameLength(size_t len) {
// TODO refactor
SColumnFilterInfo* tFilterInfoDup(const SColumnFilterInfo* src, int32_t numOfFilters) {
if (numOfFilters == 0) {
if (numOfFilters == 0 || src == NULL) {
assert(src == NULL);
return NULL;
}
......
......@@ -372,21 +372,21 @@ static void getStatics_nchr(const void *pData, int32_t numOfRow, int64_t *min, i
}
tDataTypeDescriptor tDataTypes[15] = {
{TSDB_DATA_TYPE_NULL, 6,1, "NOTYPE", NULL, NULL, NULL},
{TSDB_DATA_TYPE_BOOL, 4, CHAR_BYTES, "BOOL", tsCompressBool, tsDecompressBool, getStatics_bool},
{TSDB_DATA_TYPE_TINYINT, 7, CHAR_BYTES, "TINYINT", tsCompressTinyint, tsDecompressTinyint, getStatics_i8},
{TSDB_DATA_TYPE_SMALLINT, 8, SHORT_BYTES, "SMALLINT", tsCompressSmallint, tsDecompressSmallint, getStatics_i16},
{TSDB_DATA_TYPE_INT, 3, INT_BYTES, "INT", tsCompressInt, tsDecompressInt, getStatics_i32},
{TSDB_DATA_TYPE_BIGINT, 6, LONG_BYTES, "BIGINT", tsCompressBigint, tsDecompressBigint, getStatics_i64},
{TSDB_DATA_TYPE_FLOAT, 5, FLOAT_BYTES, "FLOAT", tsCompressFloat, tsDecompressFloat, getStatics_f},
{TSDB_DATA_TYPE_DOUBLE, 6, DOUBLE_BYTES, "DOUBLE", tsCompressDouble, tsDecompressDouble, getStatics_d},
{TSDB_DATA_TYPE_BINARY, 6, 0, "BINARY", tsCompressString, tsDecompressString, getStatics_bin},
{TSDB_DATA_TYPE_TIMESTAMP, 9, LONG_BYTES, "TIMESTAMP", tsCompressTimestamp, tsDecompressTimestamp, getStatics_i64},
{TSDB_DATA_TYPE_NCHAR, 5, 8, "NCHAR", tsCompressString, tsDecompressString, getStatics_nchr},
{TSDB_DATA_TYPE_UTINYINT, 16, CHAR_BYTES, "TINYINT UNSIGNED", tsCompressTinyint, tsDecompressTinyint, getStatics_u8},
{TSDB_DATA_TYPE_USMALLINT, 17, SHORT_BYTES, "SMALLINT UNSIGNED", tsCompressSmallint, tsDecompressSmallint, getStatics_u16},
{TSDB_DATA_TYPE_UINT, 12, INT_BYTES, "INT UNSIGNED", tsCompressInt, tsDecompressInt, getStatics_u32},
{TSDB_DATA_TYPE_UBIGINT, 15, LONG_BYTES, "BIGINT UNSIGNED", tsCompressBigint, tsDecompressBigint, getStatics_u64},
{TSDB_DATA_TYPE_NULL, 6, 1, "NOTYPE", 0, 0, NULL, NULL, NULL},
{TSDB_DATA_TYPE_BOOL, 4, CHAR_BYTES, "BOOL", false, true, tsCompressBool, tsDecompressBool, getStatics_bool},
{TSDB_DATA_TYPE_TINYINT, 7, CHAR_BYTES, "TINYINT", INT8_MIN, INT8_MAX, tsCompressTinyint, tsDecompressTinyint, getStatics_i8},
{TSDB_DATA_TYPE_SMALLINT, 8, SHORT_BYTES, "SMALLINT", INT16_MIN, INT16_MAX, tsCompressSmallint, tsDecompressSmallint, getStatics_i16},
{TSDB_DATA_TYPE_INT, 3, INT_BYTES, "INT", INT32_MIN, INT32_MAX, tsCompressInt, tsDecompressInt, getStatics_i32},
{TSDB_DATA_TYPE_BIGINT, 6, LONG_BYTES, "BIGINT", INT64_MIN, INT64_MAX, tsCompressBigint, tsDecompressBigint, getStatics_i64},
{TSDB_DATA_TYPE_FLOAT, 5, FLOAT_BYTES, "FLOAT", 0, 0, tsCompressFloat, tsDecompressFloat, getStatics_f},
{TSDB_DATA_TYPE_DOUBLE, 6, DOUBLE_BYTES, "DOUBLE", 0, 0, tsCompressDouble, tsDecompressDouble, getStatics_d},
{TSDB_DATA_TYPE_BINARY, 6, 0, "BINARY", 0, 0, tsCompressString, tsDecompressString, getStatics_bin},
{TSDB_DATA_TYPE_TIMESTAMP, 9, LONG_BYTES, "TIMESTAMP", INT64_MIN, INT64_MAX, tsCompressTimestamp, tsDecompressTimestamp, getStatics_i64},
{TSDB_DATA_TYPE_NCHAR, 5, 8, "NCHAR", 0, 0, tsCompressString, tsDecompressString, getStatics_nchr},
{TSDB_DATA_TYPE_UTINYINT, 16, CHAR_BYTES, "TINYINT UNSIGNED", 0, UINT8_MAX, tsCompressTinyint, tsDecompressTinyint, getStatics_u8},
{TSDB_DATA_TYPE_USMALLINT, 17, SHORT_BYTES, "SMALLINT UNSIGNED", 0, UINT16_MAX, tsCompressSmallint, tsDecompressSmallint, getStatics_u16},
{TSDB_DATA_TYPE_UINT, 12, INT_BYTES, "INT UNSIGNED", 0, UINT32_MAX, tsCompressInt, tsDecompressInt, getStatics_u32},
{TSDB_DATA_TYPE_UBIGINT, 15, LONG_BYTES, "BIGINT UNSIGNED", 0, UINT64_MAX, tsCompressBigint, tsDecompressBigint, getStatics_u64},
};
char tTokenTypeSwitcher[13] = {
......@@ -405,6 +405,32 @@ char tTokenTypeSwitcher[13] = {
TSDB_DATA_TYPE_NCHAR, // TK_NCHAR
};
float floatMin = -FLT_MAX, floatMax = FLT_MAX;
double doubleMin = -DBL_MAX, doubleMax = DBL_MAX;
FORCE_INLINE void* getDataMin(int32_t type) {
switch (type) {
case TSDB_DATA_TYPE_FLOAT:
return &floatMin;
case TSDB_DATA_TYPE_DOUBLE:
return &doubleMin;
default:
return &tDataTypes[type].minValue;
}
}
FORCE_INLINE void* getDataMax(int32_t type) {
switch (type) {
case TSDB_DATA_TYPE_FLOAT:
return &floatMax;
case TSDB_DATA_TYPE_DOUBLE:
return &doubleMax;
default:
return &tDataTypes[type].maxValue;
}
}
bool isValidDataType(int32_t type) {
return type >= TSDB_DATA_TYPE_NULL && type <= TSDB_DATA_TYPE_UBIGINT;
}
......@@ -566,6 +592,53 @@ void assignVal(char *val, const char *src, int32_t len, int32_t type) {
}
}
void operateVal(void *dst, void *s1, void *s2, int32_t optr, int32_t type) {
if (optr == TSDB_BINARY_OP_ADD) {
switch (type) {
case TSDB_DATA_TYPE_TINYINT:
*((int8_t *)dst) = GET_INT8_VAL(s1) + GET_INT8_VAL(s2);
break;
case TSDB_DATA_TYPE_UTINYINT:
*((uint8_t *)dst) = GET_UINT8_VAL(s1) + GET_UINT8_VAL(s2);
break;
case TSDB_DATA_TYPE_SMALLINT:
*((int16_t *)dst) = GET_INT16_VAL(s1) + GET_INT16_VAL(s2);
break;
case TSDB_DATA_TYPE_USMALLINT:
*((uint16_t *)dst) = GET_UINT16_VAL(s1) + GET_UINT16_VAL(s2);
break;
case TSDB_DATA_TYPE_INT:
*((int32_t *)dst) = GET_INT32_VAL(s1) + GET_INT32_VAL(s2);
break;
case TSDB_DATA_TYPE_UINT:
*((uint32_t *)dst) = GET_UINT32_VAL(s1) + GET_UINT32_VAL(s2);
break;
case TSDB_DATA_TYPE_BIGINT:
*((int64_t *)dst) = GET_INT64_VAL(s1) + GET_INT64_VAL(s2);
break;
case TSDB_DATA_TYPE_UBIGINT:
*((uint64_t *)dst) = GET_UINT64_VAL(s1) + GET_UINT64_VAL(s2);
break;
case TSDB_DATA_TYPE_TIMESTAMP:
*((int64_t *)dst) = GET_INT64_VAL(s1) + GET_INT64_VAL(s2);
break;
case TSDB_DATA_TYPE_FLOAT:
SET_FLOAT_VAL(dst, GET_FLOAT_VAL(s1) + GET_FLOAT_VAL(s2));
break;
case TSDB_DATA_TYPE_DOUBLE:
SET_DOUBLE_VAL(dst, GET_DOUBLE_VAL(s1) + GET_DOUBLE_VAL(s2));
break;
default: {
assert(0);
break;
}
}
} else {
assert(0);
}
}
void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf) {
switch (type) {
case TSDB_DATA_TYPE_INT:
......
......@@ -23,6 +23,13 @@
#include "tutil.h"
#include "tvariant.h"
#define SET_EXT_INFO(converted, res, minv, maxv, exti) do { \
if (converted == NULL || exti == NULL || *converted == false) { break; } \
if ((res) < (minv)) { *exti = -1; break; } \
if ((res) > (maxv)) { *exti = 1; break; } \
assert(0); \
} while (0)
void tVariantCreate(tVariant *pVar, SStrToken *token) {
int32_t ret = 0;
int32_t type = token->type;
......@@ -184,7 +191,7 @@ void tVariantDestroy(tVariant *pVar) {
}
// NOTE: this is only for string array
if (pVar->nType == TSDB_DATA_TYPE_ARRAY) {
if (pVar->nType == TSDB_DATA_TYPE_POINTER_ARRAY) {
size_t num = taosArrayGetSize(pVar->arr);
for(size_t i = 0; i < num; i++) {
void* p = taosArrayGetP(pVar->arr, i);
......@@ -192,6 +199,9 @@ void tVariantDestroy(tVariant *pVar) {
}
taosArrayDestroy(pVar->arr);
pVar->arr = NULL;
} else if (pVar->nType == TSDB_DATA_TYPE_VALUE_ARRAY) {
taosArrayDestroy(pVar->arr);
pVar->arr = NULL;
}
}
......@@ -220,7 +230,7 @@ void tVariantAssign(tVariant *pDst, const tVariant *pSrc) {
if (IS_NUMERIC_TYPE(pSrc->nType) || (pSrc->nType == TSDB_DATA_TYPE_BOOL)) {
pDst->i64 = pSrc->i64;
} else if (pSrc->nType == TSDB_DATA_TYPE_ARRAY) { // this is only for string array
} else if (pSrc->nType == TSDB_DATA_TYPE_POINTER_ARRAY) { // this is only for string array
size_t num = taosArrayGetSize(pSrc->arr);
pDst->arr = taosArrayInit(num, sizeof(char*));
for(size_t i = 0; i < num; i++) {
......@@ -228,9 +238,18 @@ void tVariantAssign(tVariant *pDst, const tVariant *pSrc) {
char* n = strdup(p);
taosArrayPush(pDst->arr, &n);
}
} else if (pSrc->nType == TSDB_DATA_TYPE_VALUE_ARRAY) {
size_t num = taosArrayGetSize(pSrc->arr);
pDst->arr = taosArrayInit(num, sizeof(int64_t));
pDst->nLen = pSrc->nLen;
assert(pSrc->nLen == num);
for(size_t i = 0; i < num; i++) {
int64_t *p = taosArrayGet(pSrc->arr, i);
taosArrayPush(pDst->arr, p);
}
}
if (pDst->nType != TSDB_DATA_TYPE_ARRAY) {
if (pDst->nType != TSDB_DATA_TYPE_POINTER_ARRAY && pDst->nType != TSDB_DATA_TYPE_VALUE_ARRAY) {
pDst->nLen = tDataTypes[pDst->nType].bytes;
}
}
......@@ -450,7 +469,7 @@ static FORCE_INLINE int32_t convertToDouble(char *pStr, int32_t len, double *val
return 0;
}
static FORCE_INLINE int32_t convertToInteger(tVariant *pVariant, int64_t *result, int32_t type, bool issigned, bool releaseVariantPtr) {
static FORCE_INLINE int32_t convertToInteger(tVariant *pVariant, int64_t *result, int32_t type, bool issigned, bool releaseVariantPtr, bool *converted) {
if (pVariant->nType == TSDB_DATA_TYPE_NULL) {
setNull((char *)result, type, tDataTypes[type].bytes);
return 0;
......@@ -540,6 +559,10 @@ static FORCE_INLINE int32_t convertToInteger(tVariant *pVariant, int64_t *result
}
}
if (converted) {
*converted = true;
}
bool code = false;
uint64_t ui = 0;
......@@ -602,6 +625,18 @@ static int32_t convertToBool(tVariant *pVariant, int64_t *pDest) {
* to column type defined in schema
*/
int32_t tVariantDump(tVariant *pVariant, char *payload, int16_t type, bool includeLengthPrefix) {
return tVariantDumpEx(pVariant, payload, type, includeLengthPrefix, NULL, NULL);
}
/*
* transfer data from variant serve as the implicit data conversion: from input sql string pVariant->nType
* to column type defined in schema
*/
int32_t tVariantDumpEx(tVariant *pVariant, char *payload, int16_t type, bool includeLengthPrefix, bool *converted, char *extInfo) {
if (converted) {
*converted = false;
}
if (pVariant == NULL || (pVariant->nType != 0 && !isValidDataType(pVariant->nType))) {
return -1;
}
......@@ -620,7 +655,8 @@ int32_t tVariantDump(tVariant *pVariant, char *payload, int16_t type, bool inclu
}
case TSDB_DATA_TYPE_TINYINT: {
if (convertToInteger(pVariant, &result, type, true, false) < 0) {
if (convertToInteger(pVariant, &result, type, true, false, converted) < 0) {
SET_EXT_INFO(converted, result, INT8_MIN + 1, INT8_MAX, extInfo);
return -1;
}
*((int8_t *)payload) = (int8_t) result;
......@@ -628,7 +664,8 @@ int32_t tVariantDump(tVariant *pVariant, char *payload, int16_t type, bool inclu
}
case TSDB_DATA_TYPE_UTINYINT: {
if (convertToInteger(pVariant, &result, type, false, false) < 0) {
if (convertToInteger(pVariant, &result, type, false, false, converted) < 0) {
SET_EXT_INFO(converted, result, 0, UINT8_MAX - 1, extInfo);
return -1;
}
*((uint8_t *)payload) = (uint8_t) result;
......@@ -636,7 +673,8 @@ int32_t tVariantDump(tVariant *pVariant, char *payload, int16_t type, bool inclu
}
case TSDB_DATA_TYPE_SMALLINT: {
if (convertToInteger(pVariant, &result, type, true, false) < 0) {
if (convertToInteger(pVariant, &result, type, true, false, converted) < 0) {
SET_EXT_INFO(converted, result, INT16_MIN + 1, INT16_MAX, extInfo);
return -1;
}
*((int16_t *)payload) = (int16_t)result;
......@@ -644,7 +682,8 @@ int32_t tVariantDump(tVariant *pVariant, char *payload, int16_t type, bool inclu
}
case TSDB_DATA_TYPE_USMALLINT: {
if (convertToInteger(pVariant, &result, type, false, false) < 0) {
if (convertToInteger(pVariant, &result, type, false, false, converted) < 0) {
SET_EXT_INFO(converted, result, 0, UINT16_MAX - 1, extInfo);
return -1;
}
*((uint16_t *)payload) = (uint16_t)result;
......@@ -652,7 +691,8 @@ int32_t tVariantDump(tVariant *pVariant, char *payload, int16_t type, bool inclu
}
case TSDB_DATA_TYPE_INT: {
if (convertToInteger(pVariant, &result, type, true, false) < 0) {
if (convertToInteger(pVariant, &result, type, true, false, converted) < 0) {
SET_EXT_INFO(converted, result, INT32_MIN + 1, INT32_MAX, extInfo);
return -1;
}
*((int32_t *)payload) = (int32_t)result;
......@@ -660,7 +700,8 @@ int32_t tVariantDump(tVariant *pVariant, char *payload, int16_t type, bool inclu
}
case TSDB_DATA_TYPE_UINT: {
if (convertToInteger(pVariant, &result, type, false, false) < 0) {
if (convertToInteger(pVariant, &result, type, false, false, converted) < 0) {
SET_EXT_INFO(converted, result, 0, UINT32_MAX - 1, extInfo);
return -1;
}
*((uint32_t *)payload) = (uint32_t)result;
......@@ -668,7 +709,8 @@ int32_t tVariantDump(tVariant *pVariant, char *payload, int16_t type, bool inclu
}
case TSDB_DATA_TYPE_BIGINT: {
if (convertToInteger(pVariant, &result, type, true, false) < 0) {
if (convertToInteger(pVariant, &result, type, true, false, converted) < 0) {
SET_EXT_INFO(converted, (int64_t)result, INT64_MIN + 1, INT64_MAX, extInfo);
return -1;
}
*((int64_t *)payload) = (int64_t)result;
......@@ -676,7 +718,8 @@ int32_t tVariantDump(tVariant *pVariant, char *payload, int16_t type, bool inclu
}
case TSDB_DATA_TYPE_UBIGINT: {
if (convertToInteger(pVariant, &result, type, false, false) < 0) {
if (convertToInteger(pVariant, &result, type, false, false, converted) < 0) {
SET_EXT_INFO(converted, (uint64_t)result, 0, UINT64_MAX - 1, extInfo);
return -1;
}
*((uint64_t *)payload) = (uint64_t)result;
......@@ -696,11 +739,37 @@ int32_t tVariantDump(tVariant *pVariant, char *payload, int16_t type, bool inclu
return -1;
}
if (converted) {
*converted = true;
}
if (value > FLT_MAX || value < -FLT_MAX) {
SET_EXT_INFO(converted, value, -FLT_MAX, FLT_MAX, extInfo);
return -1;
}
SET_FLOAT_VAL(payload, value);
}
} else if (pVariant->nType == TSDB_DATA_TYPE_BOOL || IS_SIGNED_NUMERIC_TYPE(pVariant->nType) || IS_UNSIGNED_NUMERIC_TYPE(pVariant->nType)) {
if (converted) {
*converted = true;
}
if (pVariant->i64 > FLT_MAX || pVariant->i64 < -FLT_MAX) {
SET_EXT_INFO(converted, pVariant->i64, -FLT_MAX, FLT_MAX, extInfo);
return -1;
}
SET_FLOAT_VAL(payload, pVariant->i64);
} else if (IS_FLOAT_TYPE(pVariant->nType)) {
if (converted) {
*converted = true;
}
if (pVariant->dKey > FLT_MAX || pVariant->dKey < -FLT_MAX) {
SET_EXT_INFO(converted, pVariant->dKey, -FLT_MAX, FLT_MAX, extInfo);
return -1;
}
SET_FLOAT_VAL(payload, pVariant->dKey);
} else if (pVariant->nType == TSDB_DATA_TYPE_NULL) {
*((uint32_t *)payload) = TSDB_DATA_FLOAT_NULL;
......@@ -824,6 +893,7 @@ int32_t tVariantDump(tVariant *pVariant, char *payload, int16_t type, bool inclu
return 0;
}
/*
* In variant, bool/smallint/tinyint/int/bigint share the same attribution of
* structure, also ignore the convert the type required
......@@ -848,7 +918,7 @@ int32_t tVariantTypeSetType(tVariant *pVariant, char type) {
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_SMALLINT: {
convertToInteger(pVariant, &(pVariant->i64), type, true, true);
convertToInteger(pVariant, &(pVariant->i64), type, true, true, NULL);
pVariant->nType = TSDB_DATA_TYPE_BIGINT;
break;
}
......
......@@ -113,7 +113,6 @@
</includes>
<excludes>
<exclude>**/AppMemoryLeakTest.java</exclude>
<exclude>**/AuthenticationTest.java</exclude>
<exclude>**/ConnectMultiTaosdByRestfulWithDifferentTokenTest.java</exclude>
<exclude>**/DatetimeBefore1970Test.java</exclude>
<exclude>**/FailOverTest.java</exclude>
......
......@@ -14,6 +14,8 @@
*****************************************************************************/
package com.taosdata.jdbc;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.sql.*;
import java.util.*;
import java.util.logging.Logger;
......@@ -127,6 +129,11 @@ public class TSDBDriver extends AbstractDriver {
return null;
}
if (!props.containsKey(TSDBDriver.PROPERTY_KEY_USER))
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_USER_IS_REQUIRED);
if (!props.containsKey(TSDBDriver.PROPERTY_KEY_PASSWORD))
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_PASSWORD_IS_REQUIRED);
try {
TSDBJNIConnector.init((String) props.get(PROPERTY_KEY_CONFIG_DIR), (String) props.get(PROPERTY_KEY_LOCALE),
(String) props.get(PROPERTY_KEY_CHARSET), (String) props.get(PROPERTY_KEY_TIME_ZONE));
......
......@@ -33,6 +33,8 @@ public class TSDBError {
TSDBErrorMap.put(TSDBErrorNumbers.ERROR_NUMERIC_VALUE_OUT_OF_RANGE, "numeric value out of range");
TSDBErrorMap.put(TSDBErrorNumbers.ERROR_UNKNOWN_TAOS_TYPE, "unknown taos type in tdengine");
TSDBErrorMap.put(TSDBErrorNumbers.ERROR_UNKNOWN_TIMESTAMP_PRECISION, "unknown timestamp precision");
TSDBErrorMap.put(TSDBErrorNumbers.ERROR_USER_IS_REQUIRED, "user is required");
TSDBErrorMap.put(TSDBErrorNumbers.ERROR_PASSWORD_IS_REQUIRED, "password is required");
TSDBErrorMap.put(TSDBErrorNumbers.ERROR_UNKNOWN, "unknown error");
......
......@@ -29,6 +29,9 @@ public class TSDBErrorNumbers {
public static final int ERROR_UNKNOWN_TIMESTAMP_PRECISION = 0x2316; // unknown timestamp precision
public static final int ERROR_RESTFul_Client_Protocol_Exception = 0x2317;
public static final int ERROR_RESTFul_Client_IOException = 0x2318;
public static final int ERROR_USER_IS_REQUIRED = 0x2319; // user is required
public static final int ERROR_PASSWORD_IS_REQUIRED = 0x231a; // password is required
public static final int ERROR_UNKNOWN = 0x2350; //unknown error
......@@ -67,6 +70,8 @@ public class TSDBErrorNumbers {
errorNumbers.add(ERROR_UNKNOWN_TAOS_TYPE);
errorNumbers.add(ERROR_UNKNOWN_TIMESTAMP_PRECISION);
errorNumbers.add(ERROR_RESTFul_Client_IOException);
errorNumbers.add(ERROR_USER_IS_REQUIRED);
errorNumbers.add(ERROR_PASSWORD_IS_REQUIRED);
errorNumbers.add(ERROR_RESTFul_Client_Protocol_Exception);
......
......@@ -36,7 +36,6 @@ public class TSDBJNIConnector {
static {
System.loadLibrary("taos");
System.out.println("java.library.path:" + System.getProperty("java.library.path"));
}
public boolean isClosed() {
......
......@@ -7,6 +7,7 @@ import com.taosdata.jdbc.utils.HttpClientPoolUtil;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.sql.*;
import java.util.Properties;
import java.util.logging.Logger;
......@@ -40,8 +41,13 @@ public class RestfulDriver extends AbstractDriver {
String loginUrl = "http://" + host + ":" + port + "/rest/login/" + props.getProperty(TSDBDriver.PROPERTY_KEY_USER) + "/" + props.getProperty(TSDBDriver.PROPERTY_KEY_PASSWORD) + "";
try {
String user = URLEncoder.encode(props.getProperty(TSDBDriver.PROPERTY_KEY_USER), "UTF-8");
String password = URLEncoder.encode(props.getProperty(TSDBDriver.PROPERTY_KEY_PASSWORD), "UTF-8");
if (!props.containsKey(TSDBDriver.PROPERTY_KEY_USER))
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_USER_IS_REQUIRED);
if (!props.containsKey(TSDBDriver.PROPERTY_KEY_PASSWORD))
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_PASSWORD_IS_REQUIRED);
String user = URLEncoder.encode(props.getProperty(TSDBDriver.PROPERTY_KEY_USER), StandardCharsets.UTF_8.displayName());
String password = URLEncoder.encode(props.getProperty(TSDBDriver.PROPERTY_KEY_PASSWORD), StandardCharsets.UTF_8.displayName());
loginUrl = "http://" + props.getProperty(TSDBDriver.PROPERTY_KEY_HOST) + ":" + props.getProperty(TSDBDriver.PROPERTY_KEY_PORT) + "/rest/login/" + user + "/" + password + "";
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
......
......@@ -7,6 +7,7 @@ import com.taosdata.jdbc.AbstractStatement;
import com.taosdata.jdbc.TSDBDriver;
import com.taosdata.jdbc.TSDBError;
import com.taosdata.jdbc.TSDBErrorNumbers;
import com.taosdata.jdbc.enums.TimestampFormat;
import com.taosdata.jdbc.utils.HttpClientPoolUtil;
import com.taosdata.jdbc.utils.SqlSyntaxValidator;
......@@ -45,9 +46,7 @@ public class RestfulStatement extends AbstractStatement {
if (!SqlSyntaxValidator.isValidForExecuteUpdate(sql))
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_INVALID_FOR_EXECUTE_UPDATE, "not a valid sql for executeUpdate: " + sql);
final String url = "http://" + conn.getHost() + ":" + conn.getPort() + "/rest/sql";
return executeOneUpdate(url, sql);
return executeOneUpdate(sql);
}
@Override
......@@ -62,34 +61,25 @@ public class RestfulStatement extends AbstractStatement {
public boolean execute(String sql) throws SQLException {
if (isClosed())
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED);
if (!SqlSyntaxValidator.isValidForExecute(sql))
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_INVALID_FOR_EXECUTE, "not a valid sql for execute: " + sql);
//如果执行了use操作应该将当前Statement的catalog设置为新的database
boolean result = true;
String url = "http://" + conn.getHost() + ":" + conn.getPort() + "/rest/sql";
if (conn.getClientInfo(TSDBDriver.PROPERTY_KEY_TIMESTAMP_FORMAT).equals("TIMESTAMP")) {
url = "http://" + conn.getHost() + ":" + conn.getPort() + "/rest/sqlt";
}
if (conn.getClientInfo(TSDBDriver.PROPERTY_KEY_TIMESTAMP_FORMAT).equals("UTC")) {
url = "http://" + conn.getHost() + ":" + conn.getPort() + "/rest/sqlutc";
}
if (SqlSyntaxValidator.isUseSql(sql)) {
HttpClientPoolUtil.execute(url, sql, this.conn.getToken());
HttpClientPoolUtil.execute(getUrl(), sql, this.conn.getToken());
this.database = sql.trim().replace("use", "").trim();
this.conn.setCatalog(this.database);
result = false;
} else if (SqlSyntaxValidator.isDatabaseUnspecifiedQuery(sql)) {
executeOneQuery(sql);
} else if (SqlSyntaxValidator.isDatabaseUnspecifiedUpdate(sql)) {
executeOneUpdate(url, sql);
executeOneUpdate(sql);
result = false;
} else {
if (SqlSyntaxValidator.isValidForExecuteQuery(sql)) {
executeQuery(sql);
executeOneQuery(sql);
} else {
executeUpdate(sql);
executeOneUpdate(sql);
result = false;
}
}
......@@ -97,19 +87,25 @@ public class RestfulStatement extends AbstractStatement {
return result;
}
private ResultSet executeOneQuery(String sql) throws SQLException {
if (!SqlSyntaxValidator.isValidForExecuteQuery(sql))
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_INVALID_FOR_EXECUTE_QUERY, "not a valid sql for executeQuery: " + sql);
private String getUrl() throws SQLException {
TimestampFormat timestampFormat = TimestampFormat.valueOf(conn.getClientInfo(TSDBDriver.PROPERTY_KEY_TIMESTAMP_FORMAT).trim().toUpperCase());
String url;
switch (timestampFormat) {
case TIMESTAMP:
url = "http://" + conn.getHost() + ":" + conn.getPort() + "/rest/sqlt";
break;
case UTC:
url = "http://" + conn.getHost() + ":" + conn.getPort() + "/rest/sqlutc";
break;
default:
url = "http://" + conn.getHost() + ":" + conn.getPort() + "/rest/sql";
}
return url;
}
private ResultSet executeOneQuery(String sql) throws SQLException {
// row data
String url = "http://" + conn.getHost() + ":" + conn.getPort() + "/rest/sql";
String timestampFormat = conn.getClientInfo(TSDBDriver.PROPERTY_KEY_TIMESTAMP_FORMAT);
if ("TIMESTAMP".equalsIgnoreCase(timestampFormat))
url = "http://" + conn.getHost() + ":" + conn.getPort() + "/rest/sqlt";
if ("UTC".equalsIgnoreCase(timestampFormat))
url = "http://" + conn.getHost() + ":" + conn.getPort() + "/rest/sqlutc";
String result = HttpClientPoolUtil.execute(url, sql, this.conn.getToken());
String result = HttpClientPoolUtil.execute(getUrl(), sql, this.conn.getToken());
JSONObject resultJson = JSON.parseObject(result);
if (resultJson.getString("status").equals("error")) {
throw TSDBError.createSQLException(resultJson.getInteger("code"), resultJson.getString("desc"));
......@@ -119,11 +115,8 @@ public class RestfulStatement extends AbstractStatement {
return resultSet;
}
private int executeOneUpdate(String url, String sql) throws SQLException {
if (!SqlSyntaxValidator.isValidForExecuteUpdate(sql))
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_INVALID_FOR_EXECUTE_UPDATE, "not a valid sql for executeUpdate: " + sql);
String result = HttpClientPoolUtil.execute(url, sql, this.conn.getToken());
private int executeOneUpdate(String sql) throws SQLException {
String result = HttpClientPoolUtil.execute(getUrl(), sql, this.conn.getToken());
JSONObject jsonObject = JSON.parseObject(result);
if (jsonObject.getString("status").equals("error")) {
throw TSDBError.createSQLException(jsonObject.getInteger("code"), jsonObject.getString("desc"));
......@@ -134,7 +127,7 @@ public class RestfulStatement extends AbstractStatement {
}
private int getAffectedRows(JSONObject jsonObject) throws SQLException {
// create ... SQLs should return 0 , and Restful result is this:
// create ... SQLs should return 0 , and Restful result like this:
// {"status": "succ", "head": ["affected_rows"], "data": [[0]], "rows": 1}
JSONArray head = jsonObject.getJSONArray("head");
if (head.size() != 1 || !"affected_rows".equals(head.getString(0)))
......
......@@ -16,8 +16,7 @@ package com.taosdata.jdbc.utils;
public class SqlSyntaxValidator {
private static final String[] SQL = {"select", "insert", "import", "create", "use", "alter", "drop", "set", "show", "describe", "reset"};
private static final String[] updateSQL = {"insert", "import", "create", "use", "alter", "drop", "set"};
private static final String[] updateSQL = {"insert", "import", "create", "use", "alter", "drop", "set", "reset"};
private static final String[] querySQL = {"select", "show", "describe"};
private static final String[] databaseUnspecifiedShow = {"databases", "dnodes", "mnodes", "variables"};
......@@ -38,14 +37,6 @@ public class SqlSyntaxValidator {
return false;
}
public static boolean isValidForExecute(String sql) {
for (String prefix : SQL) {
if (sql.trim().toLowerCase().startsWith(prefix))
return true;
}
return false;
}
public static boolean isDatabaseUnspecifiedQuery(String sql) {
for (String databaseObj : databaseUnspecifiedShow) {
if (sql.trim().toLowerCase().matches("show\\s+" + databaseObj + ".*"))
......@@ -63,9 +54,5 @@ public class SqlSyntaxValidator {
return sql.trim().toLowerCase().startsWith("use");
}
public static boolean isSelectSql(String sql) {
return sql.trim().toLowerCase().startsWith("select");
}
}
......@@ -69,6 +69,8 @@ public class SubscribeTest {
@Before
public void createDatabase() throws SQLException {
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_USER, "root");
properties.setProperty(TSDBDriver.PROPERTY_KEY_PASSWORD, "taosdata");
properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
......
package com.taosdata.jdbc.cases;
import com.taosdata.jdbc.TSDBErrorNumbers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import java.sql.*;
......@@ -12,6 +15,47 @@ public class AuthenticationTest {
private static final String password = "taos?data";
private Connection conn;
@Test
public void connectWithoutUserByJni() {
try {
DriverManager.getConnection("jdbc:TAOS://" + host + ":0/?");
} catch (SQLException e) {
Assert.assertEquals(TSDBErrorNumbers.ERROR_USER_IS_REQUIRED, e.getErrorCode());
Assert.assertEquals("ERROR (2319): user is required", e.getMessage());
}
}
@Test
public void connectWithoutUserByRestful() {
try {
DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/?");
} catch (SQLException e) {
Assert.assertEquals(TSDBErrorNumbers.ERROR_USER_IS_REQUIRED, e.getErrorCode());
Assert.assertEquals("ERROR (2319): user is required", e.getMessage());
}
}
@Test
public void connectWithoutPasswordByJni() {
try {
DriverManager.getConnection("jdbc:TAOS://" + host + ":0/?user=root");
} catch (SQLException e) {
Assert.assertEquals(TSDBErrorNumbers.ERROR_PASSWORD_IS_REQUIRED, e.getErrorCode());
Assert.assertEquals("ERROR (231a): password is required", e.getMessage());
}
}
@Test
public void connectWithoutPasswordByRestful() {
try {
DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/?user=root");
} catch (SQLException e) {
Assert.assertEquals(TSDBErrorNumbers.ERROR_PASSWORD_IS_REQUIRED, e.getErrorCode());
Assert.assertEquals("ERROR (231a): password is required", e.getMessage());
}
}
@Ignore
@Test
public void test() {
// change password
......
......@@ -29,6 +29,8 @@ public class BatchInsertTest {
public void before() {
try {
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_USER, "root");
properties.setProperty(TSDBDriver.PROPERTY_KEY_PASSWORD, "taosdata");
properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
......
......@@ -21,6 +21,8 @@ public class ImportTest {
public static void before() {
try {
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_USER, "root");
properties.setProperty(TSDBDriver.PROPERTY_KEY_PASSWORD, "taosdata");
properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
......
......@@ -270,6 +270,41 @@ public class InsertSpecialCharacterJniTest {
}
}
@Ignore
@Test
public void testSingleQuotaEscape() throws SQLException {
final long now = System.currentTimeMillis();
final String sql = "insert into t? using ? tags(?) values(?, ?, ?) t? using " + tbname2 + " tags(?) values(?,?,?) ";
try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
// t1
pstmt.setInt(1, 1);
pstmt.setString(2, tbname2);
pstmt.setString(3, special_character_str_5);
pstmt.setTimestamp(4, new Timestamp(now));
pstmt.setBytes(5, special_character_str_5.getBytes());
// t2
pstmt.setInt(7, 2);
pstmt.setString(8, special_character_str_5);
pstmt.setTimestamp(9, new Timestamp(now));
pstmt.setString(11, special_character_str_5);
int ret = pstmt.executeUpdate();
Assert.assertEquals(2, ret);
}
String query = "select * from ?.t? where ? < ? and ts >= ? and f1 is not null";
try (PreparedStatement pstmt = conn.prepareStatement(query)) {
pstmt.setString(1, dbName);
pstmt.setInt(2, 1);
pstmt.setString(3, "ts");
pstmt.setTimestamp(4, new Timestamp(System.currentTimeMillis()));
pstmt.setTimestamp(5, new Timestamp(0));
ResultSet rs = pstmt.executeQuery();
Assert.assertNotNull(rs);
}
}
@Test
public void testCase10() throws SQLException {
final long now = System.currentTimeMillis();
......@@ -293,13 +328,12 @@ public class InsertSpecialCharacterJniTest {
Assert.assertEquals(2, ret);
}
//query t1
String query = "select * from ?.t? where ts < ? and ts >= ? and ? is not null";
String query = "select * from ?.t? where ts < ? and ts >= ? and f1 is not null";
try (PreparedStatement pstmt = conn.prepareStatement(query)) {
pstmt.setString(1, dbName);
pstmt.setInt(2, 1);
pstmt.setTimestamp(3, new Timestamp(System.currentTimeMillis()));
pstmt.setTimestamp(4, new Timestamp(0));
pstmt.setString(5, "f1");
ResultSet rs = pstmt.executeQuery();
rs.next();
......@@ -311,12 +345,11 @@ public class InsertSpecialCharacterJniTest {
Assert.assertNull(f2);
}
// query t2
query = "select * from t? where ts < ? and ts >= ? and ? is not null";
query = "select * from t? where ts < ? and ts >= ? and f2 is not null";
try (PreparedStatement pstmt = conn.prepareStatement(query)) {
pstmt.setInt(1, 2);
pstmt.setTimestamp(2, new Timestamp(System.currentTimeMillis()));
pstmt.setTimestamp(3, new Timestamp(0));
pstmt.setString(4, "f2");
ResultSet rs = pstmt.executeQuery();
rs.next();
......
......@@ -293,13 +293,12 @@ public class InsertSpecialCharacterRestfulTest {
Assert.assertEquals(2, ret);
}
//query t1
String query = "select * from ?.t? where ts < ? and ts >= ? and ? is not null";
String query = "select * from ?.t? where ts < ? and ts >= ? and f1 is not null";
try (PreparedStatement pstmt = conn.prepareStatement(query)) {
pstmt.setString(1, dbName);
pstmt.setInt(2, 1);
pstmt.setTimestamp(3, new Timestamp(System.currentTimeMillis()));
pstmt.setTimestamp(4, new Timestamp(0));
pstmt.setString(5, "f1");
ResultSet rs = pstmt.executeQuery();
rs.next();
......@@ -311,12 +310,11 @@ public class InsertSpecialCharacterRestfulTest {
Assert.assertNull(f2);
}
// query t2
query = "select * from t? where ts < ? and ts >= ? and ? is not null";
query = "select * from t? where ts < ? and ts >= ? and f2 is not null";
try (PreparedStatement pstmt = conn.prepareStatement(query)) {
pstmt.setInt(1, 2);
pstmt.setTimestamp(2, new Timestamp(System.currentTimeMillis()));
pstmt.setTimestamp(3, new Timestamp(0));
pstmt.setString(4, "f2");
ResultSet rs = pstmt.executeQuery();
rs.next();
......
......@@ -22,6 +22,8 @@ public class QueryDataTest {
public void createDatabase() {
try {
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_USER, "root");
properties.setProperty(TSDBDriver.PROPERTY_KEY_PASSWORD, "taosdata");
properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
......
package com.taosdata.jdbc.cases;
import com.taosdata.jdbc.TSDBDriver;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.sql.*;
import java.util.Properties;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
public class ResetQueryCacheTest {
static Connection connection;
static Statement statement;
static String host = "127.0.0.1";
@Before
public void init() {
try {
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
connection = DriverManager.getConnection("jdbc:TAOS://" + host + ":0/", properties);
statement = connection.createStatement();
} catch (SQLException e) {
return;
}
@Test
public void jni() throws SQLException {
// given
Connection connection = DriverManager.getConnection("jdbc:TAOS://127.0.0.1:0/?user=root&password=taosdata&timezone=UTC-8&charset=UTF-8&locale=en_US.UTF-8");
Statement statement = connection.createStatement();
// when
boolean execute = statement.execute("reset query cache");
// then
assertFalse(execute);
assertEquals(0, statement.getUpdateCount());
statement.close();
connection.close();
}
@Test
public void testResetQueryCache() throws SQLException {
String resetSql = "reset query cache";
statement.execute(resetSql);
}
public void restful() throws SQLException {
// given
Connection connection = DriverManager.getConnection("jdbc:TAOS-RS://127.0.0.1:6041/?user=root&password=taosdata&timezone=UTC-8&charset=UTF-8&locale=en_US.UTF-8");
Statement statement = connection.createStatement();
// when
boolean execute = statement.execute("reset query cache");
// then
assertFalse(execute);
assertEquals(0, statement.getUpdateCount());
@After
public void close() {
try {
if (statement != null)
statement.close();
if (connection != null)
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
statement.close();
connection.close();
}
}
\ No newline at end of file
......@@ -20,6 +20,8 @@ public class SelectTest {
public void createDatabaseAndTable() {
try {
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_USER, "root");
properties.setProperty(TSDBDriver.PROPERTY_KEY_PASSWORD, "taosdata");
properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
......
......@@ -24,6 +24,8 @@ public class StableTest {
public static void createDatabase() {
try {
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_USER, "root");
properties.setProperty(TSDBDriver.PROPERTY_KEY_PASSWORD, "taosdata");
properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
......
package com.taosdata.jdbc.utils;
import org.junit.Assert;
import org.junit.Test;
public class SqlSyntaxValidatorTest {
@Test
public void isSelectSQL() {
Assert.assertTrue(SqlSyntaxValidator.isSelectSql("select * from test.weather"));
Assert.assertTrue(SqlSyntaxValidator.isSelectSql(" select * from test.weather"));
Assert.assertTrue(SqlSyntaxValidator.isSelectSql(" select * from test.weather "));
Assert.assertFalse(SqlSyntaxValidator.isSelectSql("insert into test.weather values(now, 1.1, 2)"));
}
@Test
public void isUseSQL() {
Assert.assertTrue(SqlSyntaxValidator.isUseSql("use database test"));
}
}
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package main
import (
......
# TDengine Connector for Python
[TDengine] connector for Python enables python programs to access TDengine, using an API which is compliant with the Python DB API 2.0 (PEP-249). It uses TDengine C client library for client server communications.
[TDengine](https://github.com/taosdata/TDengine) connector for Python enables python programs to access TDengine,
using an API which is compliant with the Python DB API 2.0 (PEP-249). It uses TDengine C client library for client server communications.
## Install
......@@ -11,8 +12,417 @@ pip install ./TDengine/src/connector/python
## Source Code
[TDengine] connector for Python source code is hosted on [GitHub](https://github.com/taosdata/TDengine/tree/develop/src/connector/python).
[TDengine](https://github.com/taosdata/TDengine) connector for Python source code is hosted on [GitHub](https://github.com/taosdata/TDengine/tree/develop/src/connector/python).
## License - AGPL
## Examples
### Query with PEP-249 API
```python
import taos
conn = taos.connect()
cursor = conn.cursor()
cursor.execute("show databases")
results = cursor.fetchall()
for row in results:
print(row)
cursor.close()
conn.close()
```
### Query with objective API
```python
import taos
conn = taos.connect()
conn.exec("create database if not exists pytest")
result = conn.query("show databases")
num_of_fields = result.field_count
for field in result.fields:
print(field)
for row in result:
print(row)
result.close()
conn.exec("drop database pytest")
conn.close()
```
### Query with async API
```python
from taos import *
from ctypes import *
import time
def fetch_callback(p_param, p_result, num_of_rows):
print("fetched ", num_of_rows, "rows")
p = cast(p_param, POINTER(Counter))
result = TaosResult(p_result)
if num_of_rows == 0:
print("fetching completed")
p.contents.done = True
result.close()
return
if num_of_rows < 0:
p.contents.done = True
result.check_error(num_of_rows)
result.close()
return None
for row in result.rows_iter(num_of_rows):
# print(row)
None
p.contents.count += result.row_count
result.fetch_rows_a(fetch_callback, p_param)
def query_callback(p_param, p_result, code):
# type: (c_void_p, c_void_p, c_int) -> None
if p_result == None:
return
result = TaosResult(p_result)
if code == 0:
result.fetch_rows_a(fetch_callback, p_param)
result.check_error(code)
class Counter(Structure):
_fields_ = [("count", c_int), ("done", c_bool)]
def __str__(self):
return "{ count: %d, done: %s }" % (self.count, self.done)
def test_query(conn):
# type: (TaosConnection) -> None
counter = Counter(count=0)
conn.query_a("select * from log.log", query_callback, byref(counter))
while not counter.done:
print("wait query callback")
time.sleep(1)
print(counter)
conn.close()
if __name__ == "__main__":
test_query(connect())
```
### Statement API - Bind row after row
```python
from taos import *
conn = connect()
dbname = "pytest_taos_stmt"
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.exec(
"create table if not exists log(ts timestamp, bo bool, nil tinyint, \
ti tinyint, si smallint, ii int, bi bigint, tu tinyint unsigned, \
su smallint unsigned, iu int unsigned, bu bigint unsigned, \
ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)",
)
stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
params = new_bind_params(16)
params[0].timestamp(1626861392589)
params[1].bool(True)
params[2].null()
params[3].tinyint(2)
params[4].smallint(3)
params[5].int(4)
params[6].bigint(5)
params[7].tinyint_unsigned(6)
params[8].smallint_unsigned(7)
params[9].int_unsigned(8)
params[10].bigint_unsigned(9)
params[11].float(10.1)
params[12].double(10.11)
params[13].binary("hello")
params[14].nchar("stmt")
params[15].timestamp(1626861392589)
stmt.bind_param(params)
params[0].timestamp(1626861392590)
params[15].null()
stmt.bind_param(params)
stmt.execute()
result = stmt.use_result()
assert result.affected_rows == 2
result.close()
result = conn.query("select * from log")
for row in result:
print(row)
result.close()
stmt.close()
conn.close()
```
### Statement API - Bind multi rows
```python
from taos import *
conn = connect()
dbname = "pytest_taos_stmt"
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.exec(
"create table if not exists log(ts timestamp, bo bool, nil tinyint, \
ti tinyint, si smallint, ii int, bi bigint, tu tinyint unsigned, \
su smallint unsigned, iu int unsigned, bu bigint unsigned, \
ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)",
)
stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
params = new_multi_binds(16)
params[0].timestamp((1626861392589, 1626861392590, 1626861392591))
params[1].bool((True, None, False))
params[2].tinyint([-128, -128, None]) # -128 is tinyint null
params[3].tinyint([0, 127, None])
params[4].smallint([3, None, 2])
params[5].int([3, 4, None])
params[6].bigint([3, 4, None])
params[7].tinyint_unsigned([3, 4, None])
params[8].smallint_unsigned([3, 4, None])
params[9].int_unsigned([3, 4, None])
params[10].bigint_unsigned([3, 4, None])
params[11].float([3, None, 1])
params[12].double([3, None, 1.2])
params[13].binary(["abc", "dddafadfadfadfadfa", None])
params[14].nchar(["涛思数据", None, "a long string with 中文字符"])
params[15].timestamp([None, None, 1626861392591])
stmt.bind_param_batch(params)
stmt.execute()
result = stmt.use_result()
assert result.affected_rows == 3
result.close()
result = conn.query("select * from log")
for row in result:
print(row)
result.close()
stmt.close()
conn.close()
```
### Statement API - Subscribe
```python
import taos
conn = taos.connect()
dbname = "pytest_taos_subscribe_callback"
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.exec("create table if not exists log(ts timestamp, n int)")
for i in range(10):
conn.exec("insert into log values(now, %d)" % i)
sub = conn.subscribe(True, "test", "select * from log", 1000)
print("# consume from begin")
for ts, n in sub.consume():
print(ts, n)
print("# consume new data")
for i in range(5):
conn.exec("insert into log values(now, %d)(now+1s, %d)" % (i, i))
result = sub.consume()
for ts, n in result:
print(ts, n)
print("# consume with a stop condition")
for i in range(10):
conn.exec("insert into log values(now, %d)" % int(random() * 10))
result = sub.consume()
try:
ts, n = next(result)
print(ts, n)
if n > 5:
result.stop_query()
print("## stopped")
break
except StopIteration:
continue
sub.close()
conn.exec("drop database if exists %s" % dbname)
conn.close()
```
### Statement API - Subscribe asynchronously with callback
```python
from taos import *
from ctypes import *
import time
def subscribe_callback(p_sub, p_result, p_param, errno):
# type: (c_void_p, c_void_p, c_void_p, c_int) -> None
print("# fetch in callback")
result = TaosResult(p_result)
result.check_error(errno)
for row in result.rows_iter():
ts, n = row()
print(ts, n)
def test_subscribe_callback(conn):
# type: (TaosConnection) -> None
dbname = "pytest_taos_subscribe_callback"
try:
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.exec("create table if not exists log(ts timestamp, n int)")
print("# subscribe with callback")
sub = conn.subscribe(False, "test", "select * from log", 1000, subscribe_callback)
for i in range(10):
conn.exec("insert into log values(now, %d)" % i)
time.sleep(0.7)
sub.close()
conn.exec("drop database if exists %s" % dbname)
conn.close()
except Exception as err:
conn.exec("drop database if exists %s" % dbname)
conn.close()
raise err
if __name__ == "__main__":
test_subscribe_callback(connect())
```
### Statement API - Stream
```python
from taos import *
from ctypes import *
def stream_callback(p_param, p_result, p_row):
# type: (c_void_p, c_void_p, c_void_p) -> None
if p_result == None or p_row == None:
return
result = TaosResult(p_result)
row = TaosRow(result, p_row)
try:
ts, count = row()
p = cast(p_param, POINTER(Counter))
p.contents.count += count
print("[%s] inserted %d in 5s, total count: %d" % (ts.strftime("%Y-%m-%d %H:%M:%S"), count, p.contents.count))
except Exception as err:
print(err)
raise err
class Counter(ctypes.Structure):
_fields_ = [
("count", c_int),
]
def __str__(self):
return "%d" % self.count
def test_stream(conn):
# type: (TaosConnection) -> None
dbname = "pytest_taos_stream"
try:
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.exec("create table if not exists log(ts timestamp, n int)")
result = conn.query("select count(*) from log interval(5s)")
assert result.field_count == 2
counter = Counter()
counter.count = 0
stream = conn.stream("select count(*) from log interval(5s)", stream_callback, param=byref(counter))
for _ in range(0, 20):
conn.exec("insert into log values(now,0)(now+1s, 1)(now + 2s, 2)")
time.sleep(2)
stream.close()
conn.exec("drop database if exists %s" % dbname)
conn.close()
except Exception as err:
conn.exec("drop database if exists %s" % dbname)
conn.close()
raise err
if __name__ == "__main__":
test_stream(connect())
```
### Insert with line protocol
```python
import taos
conn = taos.connect()
dbname = "pytest_line"
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s precision 'us'" % dbname)
conn.select_db(dbname)
lines = [
'st,t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"pass",c2=false,c4=4f64 1626006833639000000ns',
'st,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"pass it again",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000ns',
'stf,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"pass it again_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000ns',
]
conn.insert_lines(lines)
print("inserted")
lines = [
'stf,t1=5i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"pass it again_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000ns',
]
conn.insert_lines(lines)
result = conn.query("show tables")
for row in result:
print(row)
result.close()
conn.exec("drop database if exists %s" % dbname)
conn.close()
```
## License - AGPL-3.0
Keep same with [TDengine](https://github.com/taosdata/TDengine).
# encoding:UTF-8
from taos import *
conn = connect()
dbname = "pytest_taos_stmt_multi"
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.execute(
"create table if not exists log(ts timestamp, bo bool, nil tinyint, \
ti tinyint, si smallint, ii int, bi bigint, tu tinyint unsigned, \
su smallint unsigned, iu int unsigned, bu bigint unsigned, \
ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)",
)
stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
params = new_multi_binds(16)
params[0].timestamp((1626861392589, 1626861392590, 1626861392591))
params[1].bool((True, None, False))
params[2].tinyint([-128, -128, None]) # -128 is tinyint null
params[3].tinyint([0, 127, None])
params[4].smallint([3, None, 2])
params[5].int([3, 4, None])
params[6].bigint([3, 4, None])
params[7].tinyint_unsigned([3, 4, None])
params[8].smallint_unsigned([3, 4, None])
params[9].int_unsigned([3, 4, None])
params[10].bigint_unsigned([3, 4, None])
params[11].float([3, None, 1])
params[12].double([3, None, 1.2])
params[13].binary(["abc", "dddafadfadfadfadfa", None])
params[14].nchar(["涛思数据", None, "a long string with 中文字符"])
params[15].timestamp([None, None, 1626861392591])
stmt.bind_param_batch(params)
stmt.execute()
result = stmt.use_result()
assert result.affected_rows == 3
result.close()
result = conn.query("select * from log")
for row in result:
print(row)
result.close()
stmt.close()
conn.close()
\ No newline at end of file
from taos import *
conn = connect()
dbname = "pytest_taos_stmt"
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.execute(
"create table if not exists log(ts timestamp, bo bool, nil tinyint, \
ti tinyint, si smallint, ii int, bi bigint, tu tinyint unsigned, \
su smallint unsigned, iu int unsigned, bu bigint unsigned, \
ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)",
)
stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
params = new_bind_params(16)
params[0].timestamp(1626861392589)
params[1].bool(True)
params[2].null()
params[3].tinyint(2)
params[4].smallint(3)
params[5].int(4)
params[6].bigint(5)
params[7].tinyint_unsigned(6)
params[8].smallint_unsigned(7)
params[9].int_unsigned(8)
params[10].bigint_unsigned(9)
params[11].float(10.1)
params[12].double(10.11)
params[13].binary("hello")
params[14].nchar("stmt")
params[15].timestamp(1626861392589)
stmt.bind_param(params)
params[0].timestamp(1626861392590)
params[15].null()
stmt.bind_param(params)
stmt.execute()
result = stmt.use_result()
assert result.affected_rows == 2
# No need to explicitly close, but ok for you
# result.close()
result = conn.query("select * from log")
for row in result:
print(row)
# No need to explicitly close, but ok for you
# result.close()
# stmt.close()
# conn.close()
import taos
conn = taos.connect()
dbname = "pytest_line"
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s precision 'us'" % dbname)
conn.select_db(dbname)
lines = [
'st,t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"pass",c2=false,c4=4f64 1626006833639000000ns',
]
conn.insert_lines(lines)
print("inserted")
conn.insert_lines(lines)
result = conn.query("show tables")
for row in result:
print(row)
conn.execute("drop database if exists %s" % dbname)
import taos
conn = taos.connect()
cursor = conn.cursor()
cursor.execute("show databases")
results = cursor.fetchall()
for row in results:
print(row)
from taos import *
from ctypes import *
import time
def fetch_callback(p_param, p_result, num_of_rows):
print("fetched ", num_of_rows, "rows")
p = cast(p_param, POINTER(Counter))
result = TaosResult(p_result)
if num_of_rows == 0:
print("fetching completed")
p.contents.done = True
# should explicitly close the result in fetch completed or cause error
result.close()
return
if num_of_rows < 0:
p.contents.done = True
result.check_error(num_of_rows)
result.close()
return None
for row in result.rows_iter(num_of_rows):
# print(row)
None
p.contents.count += result.row_count
result.fetch_rows_a(fetch_callback, p_param)
def query_callback(p_param, p_result, code):
# type: (c_void_p, c_void_p, c_int) -> None
if p_result == None:
return
result = TaosResult(p_result)
if code == 0:
result.fetch_rows_a(fetch_callback, p_param)
result.check_error(code)
# explicitly close result while query failed
result.close()
class Counter(Structure):
_fields_ = [("count", c_int), ("done", c_bool)]
def __str__(self):
return "{ count: %d, done: %s }" % (self.count, self.done)
def test_query(conn):
# type: (TaosConnection) -> None
counter = Counter(count=0)
conn.query_a("select * from log.log", query_callback, byref(counter))
while not counter.done:
print("wait query callback")
time.sleep(1)
print(counter)
# conn.close()
if __name__ == "__main__":
test_query(connect())
\ No newline at end of file
import taos
conn = taos.connect()
conn.execute("create database if not exists pytest")
result = conn.query("show databases")
num_of_fields = result.field_count
for field in result.fields:
print(field)
for row in result:
print(row)
conn.execute("drop database pytest")
from taos import *
from ctypes import *
import time
def subscribe_callback(p_sub, p_result, p_param, errno):
# type: (c_void_p, c_void_p, c_void_p, c_int) -> None
print("# fetch in callback")
result = TaosResult(p_result)
result.check_error(errno)
for row in result.rows_iter():
ts, n = row()
print(ts, n)
def test_subscribe_callback(conn):
# type: (TaosConnection) -> None
dbname = "pytest_taos_subscribe_callback"
try:
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.execute("create table if not exists log(ts timestamp, n int)")
print("# subscribe with callback")
sub = conn.subscribe(False, "test", "select * from log", 1000, subscribe_callback)
for i in range(10):
conn.execute("insert into log values(now, %d)" % i)
time.sleep(0.7)
# sub.close()
conn.execute("drop database if exists %s" % dbname)
# conn.close()
except Exception as err:
conn.execute("drop database if exists %s" % dbname)
# conn.close()
raise err
if __name__ == "__main__":
test_subscribe_callback(connect())
import taos
import random
conn = taos.connect()
dbname = "pytest_taos_subscribe"
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.execute("create table if not exists log(ts timestamp, n int)")
for i in range(10):
conn.execute("insert into log values(now, %d)" % i)
sub = conn.subscribe(False, "test", "select * from log", 1000)
print("# consume from begin")
for ts, n in sub.consume():
print(ts, n)
print("# consume new data")
for i in range(5):
conn.execute("insert into log values(now, %d)(now+1s, %d)" % (i, i))
result = sub.consume()
for ts, n in result:
print(ts, n)
sub.close(True)
print("# keep progress consume")
sub = conn.subscribe(False, "test", "select * from log", 1000)
result = sub.consume()
rows = result.fetch_all()
# consume from latest subscription needs root privilege(for /var/lib/taos).
assert result.row_count == 0
print("## consumed ", len(rows), "rows")
print("# consume with a stop condition")
for i in range(10):
conn.execute("insert into log values(now, %d)" % random.randint(0, 10))
result = sub.consume()
try:
ts, n = next(result)
print(ts, n)
if n > 5:
result.stop_query()
print("## stopped")
break
except StopIteration:
continue
sub.close()
# sub.close()
conn.execute("drop database if exists %s" % dbname)
# conn.close()
[tool.poetry]
name = "taos"
version = "2.1.0"
description = "TDengine connector for python"
authors = ["Taosdata Inc. <support@taosdata.com>"]
license = "AGPL-3.0"
readme = "README.md"
[tool.poetry.dependencies]
python = "^2.7 || ^3.4"
typing = "*"
[tool.poetry.dev-dependencies]
pytest = [
{ version = "^4.6", python = "^2.7" },
{ version = "^6.2", python = "^3.7" }
]
pdoc = { version = "^7.1.1", python = "^3.7" }
mypy = { version = "^0.910", python = "^3.6" }
black = { version = "^21.7b0", python = "^3.6" }
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
[tool.black]
line-length = 119
......@@ -5,7 +5,7 @@ with open("README.md", "r") as fh:
setuptools.setup(
name="taos",
version="2.0.11",
version="2.1.0",
author="Taosdata Inc.",
author_email="support@taosdata.com",
description="TDengine python client package",
......
# encoding:UTF-8
"""
# TDengine Connector for Python
from .connection import TDengineConnection
from .cursor import TDengineCursor
[TDengine](https://github.com/taosdata/TDengine) connector for Python enables python programs to access TDengine,
using an API which is compliant with the Python DB API 2.0 (PEP-249). It uses TDengine C client library for client server communications.
# For some reason, the following is needed for VS Code (through PyLance) to
## Install
```sh
git clone --depth 1 https://github.com/taosdata/TDengine.git
pip install ./TDengine/src/connector/python
```
## Source Code
[TDengine](https://github.com/taosdata/TDengine) connector for Python source code is hosted on [GitHub](https://github.com/taosdata/TDengine/tree/develop/src/connector/python).
## Examples
### Query with PEP-249 API
```python
import taos
conn = taos.connect()
cursor = conn.cursor()
cursor.execute("show databases")
results = cursor.fetchall()
for row in results:
print(row)
cursor.close()
conn.close()
```
### Query with objective API
```python
import taos
conn = taos.connect()
conn.exec("create database if not exists pytest")
result = conn.query("show databases")
num_of_fields = result.field_count
for field in result.fields:
print(field)
for row in result:
print(row)
result.close()
conn.exec("drop database pytest")
conn.close()
```
### Query with async API
```python
from taos import *
from ctypes import *
import time
def fetch_callback(p_param, p_result, num_of_rows):
print("fetched ", num_of_rows, "rows")
p = cast(p_param, POINTER(Counter))
result = TaosResult(p_result)
if num_of_rows == 0:
print("fetching completed")
p.contents.done = True
result.close()
return
if num_of_rows < 0:
p.contents.done = True
result.check_error(num_of_rows)
result.close()
return None
for row in result.rows_iter(num_of_rows):
# print(row)
None
p.contents.count += result.row_count
result.fetch_rows_a(fetch_callback, p_param)
def query_callback(p_param, p_result, code):
# type: (c_void_p, c_void_p, c_int) -> None
if p_result == None:
return
result = TaosResult(p_result)
if code == 0:
result.fetch_rows_a(fetch_callback, p_param)
result.check_error(code)
class Counter(Structure):
_fields_ = [("count", c_int), ("done", c_bool)]
def __str__(self):
return "{ count: %d, done: %s }" % (self.count, self.done)
def test_query(conn):
# type: (TaosConnection) -> None
counter = Counter(count=0)
conn.query_a("select * from log.log", query_callback, byref(counter))
while not counter.done:
print("wait query callback")
time.sleep(1)
print(counter)
conn.close()
if __name__ == "__main__":
test_query(connect())
```
### Statement API - Bind row after row
```python
from taos import *
conn = connect()
dbname = "pytest_taos_stmt"
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.exec(
"create table if not exists log(ts timestamp, bo bool, nil tinyint, \\
ti tinyint, si smallint, ii int, bi bigint, tu tinyint unsigned, \\
su smallint unsigned, iu int unsigned, bu bigint unsigned, \\
ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)",
)
stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
params = new_bind_params(16)
params[0].timestamp(1626861392589)
params[1].bool(True)
params[2].null()
params[3].tinyint(2)
params[4].smallint(3)
params[5].int(4)
params[6].bigint(5)
params[7].tinyint_unsigned(6)
params[8].smallint_unsigned(7)
params[9].int_unsigned(8)
params[10].bigint_unsigned(9)
params[11].float(10.1)
params[12].double(10.11)
params[13].binary("hello")
params[14].nchar("stmt")
params[15].timestamp(1626861392589)
stmt.bind_param(params)
params[0].timestamp(1626861392590)
params[15].null()
stmt.bind_param(params)
stmt.execute()
result = stmt.use_result()
assert result.affected_rows == 2
result.close()
result = conn.query("select * from log")
for row in result:
print(row)
result.close()
stmt.close()
conn.close()
```
### Statement API - Bind multi rows
```python
from taos import *
conn = connect()
dbname = "pytest_taos_stmt"
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.exec(
"create table if not exists log(ts timestamp, bo bool, nil tinyint, \\
ti tinyint, si smallint, ii int, bi bigint, tu tinyint unsigned, \\
su smallint unsigned, iu int unsigned, bu bigint unsigned, \\
ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)",
)
stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
params = new_multi_binds(16)
params[0].timestamp((1626861392589, 1626861392590, 1626861392591))
params[1].bool((True, None, False))
params[2].tinyint([-128, -128, None]) # -128 is tinyint null
params[3].tinyint([0, 127, None])
params[4].smallint([3, None, 2])
params[5].int([3, 4, None])
params[6].bigint([3, 4, None])
params[7].tinyint_unsigned([3, 4, None])
params[8].smallint_unsigned([3, 4, None])
params[9].int_unsigned([3, 4, None])
params[10].bigint_unsigned([3, 4, None])
params[11].float([3, None, 1])
params[12].double([3, None, 1.2])
params[13].binary(["abc", "dddafadfadfadfadfa", None])
params[14].nchar(["涛思数据", None, "a long string with 中文字符"])
params[15].timestamp([None, None, 1626861392591])
stmt.bind_param_batch(params)
stmt.execute()
result = stmt.use_result()
assert result.affected_rows == 3
result.close()
result = conn.query("select * from log")
for row in result:
print(row)
result.close()
stmt.close()
conn.close()
```
### Statement API - Subscribe
```python
import taos
conn = taos.connect()
dbname = "pytest_taos_subscribe_callback"
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.exec("create table if not exists log(ts timestamp, n int)")
for i in range(10):
conn.exec("insert into log values(now, %d)" % i)
sub = conn.subscribe(True, "test", "select * from log", 1000)
print("# consume from begin")
for ts, n in sub.consume():
print(ts, n)
print("# consume new data")
for i in range(5):
conn.exec("insert into log values(now, %d)(now+1s, %d)" % (i, i))
result = sub.consume()
for ts, n in result:
print(ts, n)
print("# consume with a stop condition")
for i in range(10):
conn.exec("insert into log values(now, %d)" % int(random() * 10))
result = sub.consume()
try:
ts, n = next(result)
print(ts, n)
if n > 5:
result.stop_query()
print("## stopped")
break
except StopIteration:
continue
sub.close()
conn.exec("drop database if exists %s" % dbname)
conn.close()
```
### Statement API - Subscribe asynchronously with callback
```python
from taos import *
from ctypes import *
import time
def subscribe_callback(p_sub, p_result, p_param, errno):
# type: (c_void_p, c_void_p, c_void_p, c_int) -> None
print("# fetch in callback")
result = TaosResult(p_result)
result.check_error(errno)
for row in result.rows_iter():
ts, n = row()
print(ts, n)
def test_subscribe_callback(conn):
# type: (TaosConnection) -> None
dbname = "pytest_taos_subscribe_callback"
try:
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.exec("create table if not exists log(ts timestamp, n int)")
print("# subscribe with callback")
sub = conn.subscribe(False, "test", "select * from log", 1000, subscribe_callback)
for i in range(10):
conn.exec("insert into log values(now, %d)" % i)
time.sleep(0.7)
sub.close()
conn.exec("drop database if exists %s" % dbname)
conn.close()
except Exception as err:
conn.exec("drop database if exists %s" % dbname)
conn.close()
raise err
if __name__ == "__main__":
test_subscribe_callback(connect())
```
### Statement API - Stream
```python
from taos import *
from ctypes import *
def stream_callback(p_param, p_result, p_row):
# type: (c_void_p, c_void_p, c_void_p) -> None
if p_result == None or p_row == None:
return
result = TaosResult(p_result)
row = TaosRow(result, p_row)
try:
ts, count = row()
p = cast(p_param, POINTER(Counter))
p.contents.count += count
print("[%s] inserted %d in 5s, total count: %d" % (ts.strftime("%Y-%m-%d %H:%M:%S"), count, p.contents.count))
except Exception as err:
print(err)
raise err
class Counter(ctypes.Structure):
_fields_ = [
("count", c_int),
]
def __str__(self):
return "%d" % self.count
def test_stream(conn):
# type: (TaosConnection) -> None
dbname = "pytest_taos_stream"
try:
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.exec("create table if not exists log(ts timestamp, n int)")
result = conn.query("select count(*) from log interval(5s)")
assert result.field_count == 2
counter = Counter()
counter.count = 0
stream = conn.stream("select count(*) from log interval(5s)", stream_callback, param=byref(counter))
for _ in range(0, 20):
conn.exec("insert into log values(now,0)(now+1s, 1)(now + 2s, 2)")
time.sleep(2)
stream.close()
conn.exec("drop database if exists %s" % dbname)
conn.close()
except Exception as err:
conn.exec("drop database if exists %s" % dbname)
conn.close()
raise err
if __name__ == "__main__":
test_stream(connect())
```
### Insert with line protocol
```python
import taos
conn = taos.connect()
dbname = "pytest_line"
conn.exec("drop database if exists %s" % dbname)
conn.exec("create database if not exists %s precision 'us'" % dbname)
conn.select_db(dbname)
lines = [
'st,t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"passit",c2=false,c4=4f64 1626006833639000000ns',
'st,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000ns',
'stf,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000ns',
]
conn.insert_lines(lines)
print("inserted")
lines = [
'stf,t1=5i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000ns',
]
conn.insert_lines(lines)
result = conn.query("show tables")
for row in result:
print(row)
result.close()
conn.exec("drop database if exists %s" % dbname)
conn.close()
```
## License - AGPL-3.0
Keep same with [TDengine](https://github.com/taosdata/TDengine).
"""
from .connection import TaosConnection
# For some reason, the following is needed for VS Code (through PyLance) to
# recognize that "error" is a valid module of the "taos" package.
from .error import ProgrammingError
from .error import *
from .bind import *
from .field import *
from .cursor import *
from .result import *
from .statement import *
from .subscription import *
try:
import importlib.metadata
__version__ = importlib.metadata.version("taos")
except:
None
# Globals
threadsafety = 0
paramstyle = 'pyformat'
__all__ = ['connection', 'cursor']
paramstyle = "pyformat"
__all__ = [
# functions
"connect",
"new_bind_param",
"new_bind_params",
"new_multi_binds",
"new_multi_bind",
# objects
"TaosBind",
"TaosConnection",
"TaosCursor",
"TaosResult",
"TaosRows",
"TaosRow",
"TaosStmt",
"PrecisionEnum",
]
def connect(*args, **kwargs):
""" Function to return a TDengine connector object
# type: (..., ...) -> TaosConnection
"""Function to return a TDengine connector object
Current supporting keyword parameters:
@dsn: Data source name as string
......@@ -25,4 +483,4 @@ def connect(*args, **kwargs):
@rtype: TDengineConnector
"""
return TDengineConnection(*args, **kwargs)
return TaosConnection(*args, **kwargs)
此差异已折叠。
from .cursor import TDengineCursor
from .subscription import TDengineSubscription
from .cinterface import CTaosInterface
# encoding:UTF-8
from types import FunctionType
from .cinterface import *
from .cursor import TaosCursor
from .subscription import TaosSubscription
from .statement import TaosStmt
from .stream import TaosStream
from .result import *
class TDengineConnection(object):
""" TDengine connection object
"""
class TaosConnection(object):
"""TDengine connection object"""
def __init__(self, *args, **kwargs):
self._conn = None
......@@ -21,63 +25,130 @@ class TDengineConnection(object):
def config(self, **kwargs):
# host
if 'host' in kwargs:
self._host = kwargs['host']
if "host" in kwargs:
self._host = kwargs["host"]
# user
if 'user' in kwargs:
self._user = kwargs['user']
if "user" in kwargs:
self._user = kwargs["user"]
# password
if 'password' in kwargs:
self._password = kwargs['password']
if "password" in kwargs:
self._password = kwargs["password"]
# database
if 'database' in kwargs:
self._database = kwargs['database']
if "database" in kwargs:
self._database = kwargs["database"]
# port
if 'port' in kwargs:
self._port = kwargs['port']
if "port" in kwargs:
self._port = kwargs["port"]
# config
if 'config' in kwargs:
self._config = kwargs['config']
if "config" in kwargs:
self._config = kwargs["config"]
self._chandle = CTaosInterface(self._config)
self._conn = self._chandle.connect(
self._host,
self._user,
self._password,
self._database,
self._port)
self._conn = self._chandle.connect(self._host, self._user, self._password, self._database, self._port)
def close(self):
"""Close current connection.
"""
return CTaosInterface.close(self._conn)
def subscribe(self, restart, topic, sql, interval):
"""Create a subscription.
"""
"""Close current connection."""
if self._conn:
taos_close(self._conn)
self._conn = None
@property
def client_info(self):
# type: () -> str
return taos_get_client_info()
@property
def server_info(self):
# type: () -> str
return taos_get_server_info(self._conn)
def select_db(self, database):
# type: (str) -> None
taos_select_db(self._conn, database)
def execute(self, sql):
# type: (str) -> None
"""Simplely execute sql ignoring the results"""
res = taos_query(self._conn, sql)
taos_free_result(res)
def query(self, sql):
# type: (str) -> TaosResult
result = taos_query(self._conn, sql)
return TaosResult(result, True, self)
def query_a(self, sql, callback, param):
# type: (str, async_query_callback_type, c_void_p) -> None
"""Asynchronously query a sql with callback function"""
taos_query_a(self._conn, sql, callback, param)
def subscribe(self, restart, topic, sql, interval, callback=None, param=None):
# type: (bool, str, str, int, subscribe_callback_type, c_void_p) -> TaosSubscription
"""Create a subscription."""
if self._conn is None:
return None
sub = CTaosInterface.subscribe(
self._conn, restart, topic, sql, interval)
return TDengineSubscription(sub)
sub = taos_subscribe(self._conn, restart, topic, sql, interval, callback, param)
return TaosSubscription(sub, callback != None)
def insertLines(self, lines):
"""
insert lines through line protocol
"""
def statement(self, sql=None):
# type: (str | None) -> TaosStmt
if self._conn is None:
return None
return CTaosInterface.insertLines(self._conn, lines)
def cursor(self):
"""Return a new Cursor object using the connection.
stmt = taos_stmt_init(self._conn)
if sql != None:
taos_stmt_prepare(stmt, sql)
return TaosStmt(stmt)
def load_table_info(self, tables):
# type: (str) -> None
taos_load_table_info(self._conn, tables)
def stream(self, sql, callback, stime=0, param=None, callback2=None):
# type: (str, Callable[[Any, TaosResult, TaosRows], None], int, Any, c_void_p) -> TaosStream
# cb = cast(callback, stream_callback_type)
# ref = byref(cb)
stream = taos_open_stream(self._conn, sql, callback, stime, param, callback2)
return TaosStream(stream)
def insert_lines(self, lines):
# type: (list[str]) -> None
"""Line protocol and schemaless support
## Example
```python
import taos
conn = taos.connect()
conn.exec("drop database if exists test")
conn.select_db("test")
lines = [
'ste,t2=5,t3=L"ste" c1=true,c2=4,c3="string" 1626056811855516532',
]
conn.insert_lines(lines)
```
## Exception
```python
try:
conn.insert_lines(lines)
except SchemalessError as err:
print(err)
```
"""
return TDengineCursor(self)
return taos_insert_lines(self._conn, lines)
def cursor(self):
# type: () -> TaosCursor
"""Return a new Cursor object using the connection."""
return TaosCursor(self)
def commit(self):
"""Commit any pending transaction to the database.
......@@ -87,17 +158,18 @@ class TDengineConnection(object):
pass
def rollback(self):
"""Void functionality
"""
"""Void functionality"""
pass
def clear_result_set(self):
"""Clear unused result set on this connection.
"""
"""Clear unused result set on this connection."""
pass
def __del__(self):
self.close()
if __name__ == "__main__":
conn = TDengineConnection(host='192.168.1.107')
conn = TaosConnection()
conn.close()
print("Hello world")
# encoding:UTF-8
"""Constants in TDengine python
"""
from .dbapi import *
class FieldType(object):
"""TDengine Field Types
"""
"""TDengine Field Types"""
# type_code
C_NULL = 0
C_BOOL = 1
......@@ -34,9 +33,9 @@ class FieldType(object):
C_INT_UNSIGNED_NULL = 4294967295
C_BIGINT_NULL = -9223372036854775808
C_BIGINT_UNSIGNED_NULL = 18446744073709551615
C_FLOAT_NULL = float('nan')
C_DOUBLE_NULL = float('nan')
C_BINARY_NULL = bytearray([int('0xff', 16)])
C_FLOAT_NULL = float("nan")
C_DOUBLE_NULL = float("nan")
C_BINARY_NULL = bytearray([int("0xff", 16)])
# Timestamp precision definition
C_TIMESTAMP_MILLI = 0
C_TIMESTAMP_MICRO = 1
......
此差异已折叠。
"""Type Objects and Constructors.
"""
import time
import datetime
class DBAPITypeObject(object):
def __init__(self, *values):
self.values = values
def __com__(self, other):
if other in self.values:
return 0
if other < self.values:
return 1
else:
return -1
Date = datetime.date
Time = datetime.time
Timestamp = datetime.datetime
def DataFromTicks(ticks):
return Date(*time.localtime(ticks)[:3])
def TimeFromTicks(ticks):
return Time(*time.localtime(ticks)[3:6])
def TimestampFromTicks(ticks):
return Timestamp(*time.localtime(ticks)[:6])
Binary = bytes
# STRING = DBAPITypeObject(*constants.FieldType.get_string_types())
# BINARY = DBAPITypeObject(*constants.FieldType.get_binary_types())
# NUMBER = BAPITypeObject(*constants.FieldType.get_number_types())
# DATETIME = DBAPITypeObject(*constants.FieldType.get_timestamp_types())
# ROWID = DBAPITypeObject()
此差异已折叠。
此差异已折叠。
class PrecisionEnum(object):
"""Precision enums"""
Milliseconds = 0
Microseconds = 1
Nanoseconds = 2
class PrecisionError(Exception):
"""Python datetime does not support nanoseconds error"""
pass
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册