提交 0c32e404 编写于 作者: E Elias Soong

Merge branch 'develop' into Doc-For-Creating-Multi-Table-In-A-Batch

......@@ -146,7 +146,7 @@ matrix:
branch_pattern: coverity_scan
- os: linux
dist: xenial
dist: trusty
language: c
git:
- depth: 1
......@@ -156,8 +156,9 @@ matrix:
packages:
- build-essential
- cmake
- binutils-2.26
env:
- DESC="xenial build"
- DESC="trusty/gcc-4.8/bintuils-2.26 build"
before_script:
- export TZ=Asia/Harbin
......@@ -168,7 +169,7 @@ matrix:
script:
- cmake .. > /dev/null
- make
- export PATH=/usr/lib/binutils-2.26/bin:$PATH && make
- os: linux
dist: bionic
......@@ -200,7 +201,7 @@ matrix:
dist: bionic
language: c
compiler: clang
env: DESC="linux/clang build"
env: DESC="arm64 linux/clang build"
git:
- depth: 1
......@@ -238,7 +239,7 @@ matrix:
- build-essential
- cmake
env:
- DESC="xenial build"
- DESC="arm64 xenial build"
before_script:
- export TZ=Asia/Harbin
......
......@@ -104,6 +104,10 @@ pipeline {
find pytest -name '*'sql|xargs rm -rf
./test-all.sh p2
date'''
sh '''
cd ${WKC}/tests
./test-all.sh b4fq
'''
}
}
stage('test_b1') {
......
......@@ -33,11 +33,17 @@ To build TDengine, use [CMake](https://cmake.org/) 3.5 or higher versions in the
## Install tools
### Ubuntu & Debian:
### Ubuntu 16.04 and above & Debian:
```bash
sudo apt-get install -y gcc cmake build-essential git
```
### Ubuntu 14.04:
```bash
sudo apt-get install -y gcc cmake3 build-essential git binutils-2.26
export PATH=/usr/lib/binutils-2.26/bin:$PATH
```
To compile and package the JDBC driver source code, you should have a Java jdk-8 or higher and Apache Maven 2.7 or higher installed.
To install openjdk-8:
```bash
......
......@@ -4,7 +4,7 @@ PROJECT(TDengine)
IF (DEFINED VERNUMBER)
SET(TD_VER_NUMBER ${VERNUMBER})
ELSE ()
SET(TD_VER_NUMBER "2.0.12.0")
SET(TD_VER_NUMBER "2.0.13.0")
ENDIF ()
IF (DEFINED VERCOMPATIBLE)
......
......@@ -4,6 +4,8 @@
TDengine采用关系型数据模型,需要建库、建表。因此对于一个具体的应用场景,需要考虑库的设计,超级表和普通表的设计。本节不讨论细致的语法规则,只介绍概念。
关于数据建模请参考<a href="https://www.taosdata.com/blog/2020/11/11/1945.html">视频教程</a>
## 创建库
不同类型的数据采集点往往具有不同的数据特征,包括数据采集频率的高低,数据保留时间的长短,副本的数目,数据块的大小,是否允许更新数据等等。为让各种场景下TDengine都能最大效率的工作,TDengine建议将不同数据特征的表创建在不同的库里,因为每个库可以配置不同的存储策略。创建一个库时,除SQL标准的选项外,应用还可以指定保留时长、副本数、内存块个数、时间精度、文件块里最大最小记录条数、是否压缩、一个数据文件覆盖的天数等多种参数。比如:
......@@ -60,4 +62,3 @@ TDengine支持多列模型,只要物理量是一个数据采集点同时采集
TDengine建议尽可能采用多列模型,因为插入效率以及存储效率更高。但对于有些场景,一个采集点的采集量的种类经常变化,这个时候,如果采用多列模型,就需要频繁修改超级表的结构定义,让应用变的复杂,这个时候,采用单列模型会显得简单。
关于数据建模请参考<a href="https://www.taosdata.com/blog/2020/11/11/1945.html">视频教程</a>
......@@ -6,6 +6,8 @@
TDengine的集群管理极其简单,除添加和删除节点需要人工干预之外,其他全部是自动完成,最大程度的降低了运维的工作量。本章对集群管理的操作做详细的描述。
关于集群搭建请参考<a href="https://www.taosdata.com/blog/2020/11/11/1961.html">视频教程</a>
## 准备工作
**第零步**:规划集群所有物理节点的FQDN,将规划好的FQDN分别添加到每个物理节点的/etc/hostname;修改每个物理节点的/etc/hosts,将所有集群物理节点的IP与FQDN的对应添加好。【如部署了DNS,请联系网络管理员在DNS上做好相关配置】
......@@ -227,4 +229,3 @@ SHOW MNODES;
TDengine提供一个执行程序tarbitrator, 找任何一台Linux服务器运行它即可。请点击[安装包下载](https://www.taosdata.com/cn/all-downloads/),在TDengine Arbitrator Linux一节中,选择适合的版本下载并安装。该程序对系统资源几乎没有要求,只需要保证有网络连接即可。该应用的命令行参数`-p`可以指定其对外服务的端口号,缺省是6042。配置每个taosd实例时,可以在配置文件taos.cfg里将参数arbitrator设置为arbitrator的End Point。如果该参数配置了,当副本数为偶数数,系统将自动连接配置的arbitrator。如果副本数为奇数,即使配置了arbitrator, 系统也不会去建立连接。
关于集群搭建请参考<a href="https://www.taosdata.com/blog/2020/11/11/1961.html">视频教程</a>
......@@ -348,7 +348,7 @@ function set_ipAsFqdn() {
}
function local_fqdn_check() {
#serverFqdn=$(hostname -f)
#serverFqdn=$(hostname)
echo
echo -e -n "System hostname is: ${GREEN}$serverFqdn${NC}"
echo
......@@ -911,7 +911,7 @@ function install_TDengine() {
## ==============================Main program starts from here============================
serverFqdn=$(hostname -f)
serverFqdn=$(hostname)
if [ "$verType" == "server" ]; then
# Install server and client
if [ -x ${bin_dir}/taosd ]; then
......
......@@ -345,7 +345,7 @@ function set_ipAsFqdn() {
}
function local_fqdn_check() {
#serverFqdn=$(hostname -f)
#serverFqdn=$(hostname)
echo
echo -e -n "System hostname is: ${GREEN}$serverFqdn${NC}"
echo
......@@ -881,7 +881,7 @@ function install_PowerDB() {
## ==============================Main program starts from here============================
serverFqdn=$(hostname -f)
serverFqdn=$(hostname)
if [ "$verType" == "server" ]; then
# Install server and client
if [ -x ${bin_dir}/powerd ]; then
......
......@@ -228,7 +228,7 @@ function set_ipAsFqdn() {
}
function local_fqdn_check() {
#serverFqdn=$(hostname -f)
#serverFqdn=$(hostname)
echo
echo -e -n "System hostname is: ${GREEN}$serverFqdn${NC}"
echo
......@@ -492,5 +492,5 @@ function install_TDengine() {
## ==============================Main program starts from here============================
serverFqdn=$(hostname -f)
serverFqdn=$(hostname)
install_TDengine
name: tdengine
base: core18
version: '2.0.12.0'
version: '2.0.13.0'
icon: snap/gui/t-dengine.svg
summary: an open-source big data platform designed and optimized for IoT.
description: |
......@@ -72,7 +72,7 @@ parts:
- usr/bin/taosd
- usr/bin/taos
- usr/bin/taosdemo
- usr/lib/libtaos.so.2.0.12.0
- usr/lib/libtaos.so.2.0.13.0
- usr/lib/libtaos.so.1
- usr/lib/libtaos.so
......
......@@ -26,7 +26,7 @@ extern "C" {
#define VALIDNUMOFCOLS(x) ((x) >= TSDB_MIN_COLUMNS && (x) <= TSDB_MAX_COLUMNS)
//struct SSchema;
#define VALIDNUMOFTAGS(x) ((x) >= 0 && (x) <= TSDB_MAX_TAGS)
/**
* get the number of tags of this table
......@@ -91,7 +91,7 @@ SSchema* tscGetColumnSchemaById(STableMeta* pTableMeta, int16_t colId);
* @param numOfCols
* @return
*/
bool isValidSchema(struct SSchema *pSchema, int32_t numOfCols);
bool isValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags);
/**
* get the schema for the "tbname" column. it is a built column
......
......@@ -460,7 +460,7 @@ static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pField
} else {
assert(bytes == tDataTypeDesc[type].nSize);
pRes->tsrow[columnIndex] = isNull(pData, type) ? NULL : (unsigned char*)&pInfo->pSqlExpr->param[1].i64Key;
pRes->tsrow[columnIndex] = isNull(pData, type) ? NULL : (unsigned char*)&pInfo->pSqlExpr->param[1].i64;
pRes->length[columnIndex] = bytes;
}
} else {
......
......@@ -95,7 +95,7 @@ void taos_query_a(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *pa
return;
}
nPrintTsc(sqlstr);
nPrintTsc("%s", sqlstr);
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
if (pSql == NULL) {
......@@ -365,7 +365,7 @@ static void tscProcessAsyncError(SSchedMsg *pMsg) {
void (*fp)() = pMsg->ahandle;
terrno = *(int32_t*) pMsg->msg;
tfree(pMsg->msg);
(*fp)(pMsg->thandle, NULL, *(int32_t*)pMsg->msg);
(*fp)(pMsg->thandle, NULL, terrno);
}
void tscQueueAsyncError(void(*fp), void *param, int32_t code) {
......@@ -388,7 +388,7 @@ void tscAsyncResultOnError(SSqlObj *pSql) {
}
assert(pSql->res.code != TSDB_CODE_SUCCESS);
tscError("%p add into queued async res, code:%s", pSql, tstrerror(pSql->res.code));
tscError("%p invoke user specified function due to error occured, code:%s", pSql, tstrerror(pSql->res.code));
SSqlRes *pRes = &pSql->res;
if (pSql->fp == NULL || pSql->fetchFp == NULL){
......@@ -421,7 +421,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
// check if it is a sub-query of super table query first, if true, enter another routine
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY|TSDB_QUERY_TYPE_TAG_FILTER_QUERY))) {
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY|TSDB_QUERY_TYPE_SUBQUERY|TSDB_QUERY_TYPE_TAG_FILTER_QUERY))) {
tscDebug("%p update local table meta, continue to process sql and send the corresponding query", pSql);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
......@@ -521,7 +521,6 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
}
(*pSql->fp)(pSql->param, pSql, code);
return;
}
......
......@@ -204,7 +204,7 @@ static int32_t tscProcessDescribeTable(SSqlObj *pSql) {
assert(tscGetMetaInfo(pQueryInfo, 0)->pTableMeta != NULL);
const int32_t NUM_OF_DESC_TABLE_COLUMNS = 4;
const int32_t TYPE_COLUMN_LENGTH = 16;
const int32_t TYPE_COLUMN_LENGTH = 20;
const int32_t NOTE_COLUMN_MIN_LENGTH = 8;
int32_t noteFieldLen = NOTE_COLUMN_MIN_LENGTH;
......
......@@ -95,11 +95,11 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalReducer *pReducer, tOrderDesc
int32_t functionId = pExpr->functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) {
pCtx->ptsOutputBuf = pReducer->pCtx[0].aOutputBuf;
pCtx->param[2].i64Key = pQueryInfo->order.order;
pCtx->param[2].i64 = pQueryInfo->order.order;
pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT;
pCtx->param[1].i64Key = pQueryInfo->order.orderColId;
pCtx->param[1].i64 = pQueryInfo->order.orderColId;
} else if (functionId == TSDB_FUNC_APERCT) {
pCtx->param[0].i64Key = pExpr->param[0].i64Key;
pCtx->param[0].i64 = pExpr->param[0].i64;
pCtx->param[0].nType = pExpr->param[0].nType;
}
......@@ -1064,7 +1064,7 @@ static void doExecuteSecondaryMerge(SSqlCmd *pCmd, SLocalReducer *pLocalReducer,
}
} else if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, j);
pCtx->param[0].i64Key = pExpr->param[0].i64Key;
pCtx->param[0].i64 = pExpr->param[0].i64;
}
pCtx->currentStage = MERGE_STAGE;
......
......@@ -20,6 +20,7 @@
#include "os.h"
#include "ttype.h"
#include "hash.h"
#include "tscUtil.h"
#include "tschemautil.h"
......@@ -40,46 +41,9 @@ enum {
static int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows);
static int32_t tscToInteger(SStrToken *pToken, int64_t *value, char **endPtr) {
if (pToken->n == 0) {
return TK_ILLEGAL;
}
int32_t radix = 10;
if (pToken->type == TK_HEX) {
radix = 16;
} else if (pToken->type == TK_BIN) {
radix = 2;
}
errno = 0;
*value = strtoll(pToken->z, endPtr, radix);
if (**endPtr == 'e' || **endPtr == 'E' || **endPtr == '.') {
errno = 0;
double v = round(strtod(pToken->z, endPtr));
if (v > INT64_MAX || v <= INT64_MIN) {
errno = ERANGE;
} else {
*value = (int64_t)v;
}
}
// not a valid integer number, return error
if (*endPtr - pToken->z != pToken->n) {
return TK_ILLEGAL;
}
return pToken->type;
}
static int32_t tscToDouble(SStrToken *pToken, double *value, char **endPtr) {
if (pToken->n == 0) {
return TK_ILLEGAL;
}
errno = 0;
*value = strtod(pToken->z, endPtr);
*value = strtold(pToken->z, endPtr);
// not a valid integer number, return error
if ((*endPtr - pToken->z) != pToken->n) {
......@@ -163,22 +127,31 @@ int tsParseTime(SStrToken *pToken, int64_t *time, char **next, char *error, int1
return TSDB_CODE_SUCCESS;
}
// todo extract the null value check
static bool isNullStr(SStrToken* pToken) {
return (pToken->type == TK_NULL) || ((pToken->type == TK_STRING) && (pToken->n != 0) &&
(strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0));
}
int32_t tsParseOneColumnData(SSchema *pSchema, SStrToken *pToken, char *payload, char *msg, char **str, bool primaryKey,
int16_t timePrec) {
int64_t iv;
int32_t numType;
char * endptr = NULL;
errno = 0; // clear the previous existed error information
int32_t ret;
char *endptr = NULL;
if (IS_NUMERIC_TYPE(pSchema->type) && pToken->n == 0) {
return tscInvalidSQLErrMsg(msg, "invalid numeric data", pToken->z);
}
switch (pSchema->type) {
case TSDB_DATA_TYPE_BOOL: { // bool
if (isNullStr(pToken)) {
*((uint8_t *)payload) = TSDB_DATA_BOOL_NULL;
} else {
if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) {
if (strncmp(pToken->z, "true", pToken->n) == 0) {
*(uint8_t *)payload = TSDB_TRUE;
} else if (strncmp(pToken->z, "false", pToken->n) == 0) {
*(uint8_t *)payload = TSDB_FALSE;
} else if (strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0) {
*(uint8_t *)payload = TSDB_DATA_BOOL_NULL;
} else {
return tscSQLSyntaxErrMsg(msg, "invalid bool data", pToken->z);
}
......@@ -188,61 +161,85 @@ int32_t tsParseOneColumnData(SSchema *pSchema, SStrToken *pToken, char *payload,
} else if (pToken->type == TK_FLOAT) {
double dv = strtod(pToken->z, NULL);
*(uint8_t *)payload = (int8_t)((dv == 0) ? TSDB_FALSE : TSDB_TRUE);
} else if (pToken->type == TK_NULL) {
*(uint8_t *)payload = TSDB_DATA_BOOL_NULL;
} else {
return tscInvalidSQLErrMsg(msg, "invalid bool data", pToken->z);
}
}
break;
}
case TSDB_DATA_TYPE_TINYINT:
if (pToken->type == TK_NULL) {
*((int8_t *)payload) = TSDB_DATA_TINYINT_NULL;
} else if ((pToken->type == TK_STRING) && (pToken->n != 0) &&
(strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0)) {
*((int8_t *)payload) = TSDB_DATA_TINYINT_NULL;
if (isNullStr(pToken)) {
*((uint8_t *)payload) = TSDB_DATA_TINYINT_NULL;
} else {
numType = tscToInteger(pToken, &iv, &endptr);
if (TK_ILLEGAL == numType) {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
if (ret != TSDB_CODE_SUCCESS) {
return tscInvalidSQLErrMsg(msg, "invalid tinyint data", pToken->z);
} else if (errno == ERANGE || iv > INT8_MAX || iv <= INT8_MIN) {
return tscInvalidSQLErrMsg(msg, "tinyint data overflow", pToken->z);
} else if (!IS_VALID_TINYINT(iv)) {
return tscInvalidSQLErrMsg(msg, "data overflow", pToken->z);
}
*((uint8_t *)payload) = (uint8_t)iv;
}
break;
case TSDB_DATA_TYPE_UTINYINT:
if (isNullStr(pToken)) {
*((uint8_t *)payload) = TSDB_DATA_UTINYINT_NULL;
} else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
if (ret != TSDB_CODE_SUCCESS) {
return tscInvalidSQLErrMsg(msg, "invalid unsigned tinyint data", pToken->z);
} else if (!IS_VALID_UTINYINT(iv)) {
return tscInvalidSQLErrMsg(msg, "unsigned tinyint data overflow", pToken->z);
}
*((int8_t *)payload) = (int8_t)iv;
*((uint8_t *)payload) = (uint8_t)iv;
}
break;
case TSDB_DATA_TYPE_SMALLINT:
if (pToken->type == TK_NULL) {
*((int16_t *)payload) = TSDB_DATA_SMALLINT_NULL;
} else if ((pToken->type == TK_STRING) && (pToken->n != 0) &&
(strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0)) {
if (isNullStr(pToken)) {
*((int16_t *)payload) = TSDB_DATA_SMALLINT_NULL;
} else {
numType = tscToInteger(pToken, &iv, &endptr);
if (TK_ILLEGAL == numType) {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
if (ret != TSDB_CODE_SUCCESS) {
return tscInvalidSQLErrMsg(msg, "invalid smallint data", pToken->z);
} else if (errno == ERANGE || iv > INT16_MAX || iv <= INT16_MIN) {
} else if (!IS_VALID_SMALLINT(iv)) {
return tscInvalidSQLErrMsg(msg, "smallint data overflow", pToken->z);
}
*((int16_t *)payload) = (int16_t)iv;
}
break;
case TSDB_DATA_TYPE_USMALLINT:
if (isNullStr(pToken)) {
*((uint16_t *)payload) = TSDB_DATA_USMALLINT_NULL;
} else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
if (ret != TSDB_CODE_SUCCESS) {
return tscInvalidSQLErrMsg(msg, "invalid unsigned smallint data", pToken->z);
} else if (!IS_VALID_USMALLINT(iv)) {
return tscInvalidSQLErrMsg(msg, "unsigned smallint data overflow", pToken->z);
}
*((uint16_t *)payload) = (uint16_t)iv;
}
break;
case TSDB_DATA_TYPE_INT:
if (pToken->type == TK_NULL) {
*((int32_t *)payload) = TSDB_DATA_INT_NULL;
} else if ((pToken->type == TK_STRING) && (pToken->n != 0) &&
(strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0)) {
if (isNullStr(pToken)) {
*((int32_t *)payload) = TSDB_DATA_INT_NULL;
} else {
numType = tscToInteger(pToken, &iv, &endptr);
if (TK_ILLEGAL == numType) {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
if (ret != TSDB_CODE_SUCCESS) {
return tscInvalidSQLErrMsg(msg, "invalid int data", pToken->z);
} else if (errno == ERANGE || iv > INT32_MAX || iv <= INT32_MIN) {
} else if (!IS_VALID_INT(iv)) {
return tscInvalidSQLErrMsg(msg, "int data overflow", pToken->z);
}
......@@ -251,17 +248,30 @@ int32_t tsParseOneColumnData(SSchema *pSchema, SStrToken *pToken, char *payload,
break;
case TSDB_DATA_TYPE_UINT:
if (isNullStr(pToken)) {
*((uint32_t *)payload) = TSDB_DATA_UINT_NULL;
} else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
if (ret != TSDB_CODE_SUCCESS) {
return tscInvalidSQLErrMsg(msg, "invalid unsigned int data", pToken->z);
} else if (!IS_VALID_UINT(iv)) {
return tscInvalidSQLErrMsg(msg, "unsigned int data overflow", pToken->z);
}
*((uint32_t *)payload) = (uint32_t)iv;
}
break;
case TSDB_DATA_TYPE_BIGINT:
if (pToken->type == TK_NULL) {
*((int64_t *)payload) = TSDB_DATA_BIGINT_NULL;
} else if ((pToken->type == TK_STRING) && (pToken->n != 0) &&
(strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0)) {
if (isNullStr(pToken)) {
*((int64_t *)payload) = TSDB_DATA_BIGINT_NULL;
} else {
numType = tscToInteger(pToken, &iv, &endptr);
if (TK_ILLEGAL == numType) {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
if (ret != TSDB_CODE_SUCCESS) {
return tscInvalidSQLErrMsg(msg, "invalid bigint data", pToken->z);
} else if (errno == ERANGE || iv == INT64_MIN) {
} else if (!IS_VALID_BIGINT(iv)) {
return tscInvalidSQLErrMsg(msg, "bigint data overflow", pToken->z);
}
......@@ -269,11 +279,23 @@ int32_t tsParseOneColumnData(SSchema *pSchema, SStrToken *pToken, char *payload,
}
break;
case TSDB_DATA_TYPE_UBIGINT:
if (isNullStr(pToken)) {
*((uint64_t *)payload) = TSDB_DATA_UBIGINT_NULL;
} else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
if (ret != TSDB_CODE_SUCCESS) {
return tscInvalidSQLErrMsg(msg, "invalid unsigned bigint data", pToken->z);
} else if (!IS_VALID_UBIGINT((uint64_t)iv)) {
return tscInvalidSQLErrMsg(msg, "unsigned bigint data overflow", pToken->z);
}
*((uint64_t *)payload) = iv;
}
break;
case TSDB_DATA_TYPE_FLOAT:
if (pToken->type == TK_NULL) {
*((int32_t *)payload) = TSDB_DATA_FLOAT_NULL;
} else if ((pToken->type == TK_STRING) && (pToken->n != 0) &&
(strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0)) {
if (isNullStr(pToken)) {
*((int32_t *)payload) = TSDB_DATA_FLOAT_NULL;
} else {
double dv;
......@@ -281,24 +303,16 @@ int32_t tsParseOneColumnData(SSchema *pSchema, SStrToken *pToken, char *payload,
return tscInvalidSQLErrMsg(msg, "illegal float data", pToken->z);
}
float fv = (float)dv;
if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || (fv > FLT_MAX || fv < -FLT_MAX)) {
if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) || isnan(dv)) {
return tscInvalidSQLErrMsg(msg, "illegal float data", pToken->z);
}
if (isinf(fv) || isnan(fv)) {
*((int32_t *)payload) = TSDB_DATA_FLOAT_NULL;
}
*((float *)payload) = fv;
*((float *)payload) = (float)dv;
}
break;
case TSDB_DATA_TYPE_DOUBLE:
if (pToken->type == TK_NULL) {
*((int64_t *)payload) = TSDB_DATA_DOUBLE_NULL;
} else if ((pToken->type == TK_STRING) && (pToken->n != 0) &&
(strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0)) {
if (isNullStr(pToken)) {
*((int64_t *)payload) = TSDB_DATA_DOUBLE_NULL;
} else {
double dv;
......@@ -306,16 +320,12 @@ int32_t tsParseOneColumnData(SSchema *pSchema, SStrToken *pToken, char *payload,
return tscInvalidSQLErrMsg(msg, "illegal double data", pToken->z);
}
if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || (dv > DBL_MAX || dv < -DBL_MAX)) {
if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || isinf(dv) || isnan(dv)) {
return tscInvalidSQLErrMsg(msg, "illegal double data", pToken->z);
}
if (isinf(dv) || isnan(dv)) {
*((int64_t *)payload) = TSDB_DATA_DOUBLE_NULL;
} else {
*((double *)payload) = dv;
}
}
break;
case TSDB_DATA_TYPE_BINARY:
......@@ -337,7 +347,7 @@ int32_t tsParseOneColumnData(SSchema *pSchema, SStrToken *pToken, char *payload,
setVardataNull(payload, TSDB_DATA_TYPE_NCHAR);
} else {
// if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
size_t output = 0;
int32_t output = 0;
if (!taosMbsToUcs4(pToken->z, pToken->n, varDataVal(payload), pSchema->bytes - VARSTR_HEADER_SIZE, &output)) {
char buf[512] = {0};
snprintf(buf, tListLen(buf), "%s", strerror(errno));
......
......@@ -84,35 +84,35 @@ static int normalStmtBindParam(STscStmt* stmt, TAOS_BIND* bind) {
var->nLen = 0;
if (tb->is_null != NULL && *(tb->is_null)) {
var->nType = TSDB_DATA_TYPE_NULL;
var->i64Key = 0;
var->i64 = 0;
continue;
}
var->nType = tb->buffer_type;
switch (tb->buffer_type) {
case TSDB_DATA_TYPE_NULL:
var->i64Key = 0;
var->i64 = 0;
break;
case TSDB_DATA_TYPE_BOOL:
var->i64Key = (*(int8_t*)tb->buffer) ? 1 : 0;
var->i64 = (*(int8_t*)tb->buffer) ? 1 : 0;
break;
case TSDB_DATA_TYPE_TINYINT:
var->i64Key = *(int8_t*)tb->buffer;
var->i64 = *(int8_t*)tb->buffer;
break;
case TSDB_DATA_TYPE_SMALLINT:
var->i64Key = *(int16_t*)tb->buffer;
var->i64 = *(int16_t*)tb->buffer;
break;
case TSDB_DATA_TYPE_INT:
var->i64Key = *(int32_t*)tb->buffer;
var->i64 = *(int32_t*)tb->buffer;
break;
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_TIMESTAMP:
var->i64Key = *(int64_t*)tb->buffer;
var->i64 = *(int64_t*)tb->buffer;
break;
case TSDB_DATA_TYPE_FLOAT:
......@@ -219,7 +219,7 @@ static char* normalStmtBuildSql(STscStmt* stmt) {
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_BIGINT:
taosStringBuilderAppendInteger(&sb, var->i64Key);
taosStringBuilderAppendInteger(&sb, var->i64);
break;
case TSDB_DATA_TYPE_FLOAT:
......@@ -616,7 +616,7 @@ static int doBindParam(char* data, SParamInfo* param, TAOS_BIND* bind) {
case TSDB_DATA_TYPE_NCHAR: {
switch (bind->buffer_type) {
case TSDB_DATA_TYPE_NCHAR: {
size_t output = 0;
int32_t output = 0;
if (!taosMbsToUcs4(bind->buffer, *bind->length, varDataVal(data + param->offset), param->bytes - VARSTR_HEADER_SIZE, &output)) {
return TSDB_CODE_TSC_INVALID_VALUE;
}
......@@ -678,7 +678,7 @@ static int doBindParam(char* data, SParamInfo* param, TAOS_BIND* bind) {
return TSDB_CODE_SUCCESS;
case TSDB_DATA_TYPE_NCHAR: {
size_t output = 0;
int32_t output = 0;
if (!taosMbsToUcs4(bind->buffer, *bind->length, varDataVal(data + param->offset), param->bytes - VARSTR_HEADER_SIZE, &output)) {
return TSDB_CODE_TSC_INVALID_VALUE;
}
......
......@@ -19,6 +19,7 @@
#define _GNU_SOURCE
#include "os.h"
#include "ttype.h"
#include "qAst.h"
#include "taos.h"
#include "taosmsg.h"
......@@ -75,11 +76,11 @@ static int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SC
static int32_t convertFunctionId(int32_t optr, int16_t* functionId);
static uint8_t convertOptr(SStrToken *pToken);
static int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSelection, bool isSTable, bool joinQuery);
static int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSelection, bool isSTable, bool joinQuery, bool intervalQuery);
static bool validateIpAddress(const char* ip, size_t size);
static bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo);
static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery);
static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery, bool intervalQuery);
static int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, SArray* pList, SSqlCmd* pCmd);
......@@ -484,10 +485,10 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
case TSDB_SQL_CREATE_USER:
case TSDB_SQL_ALTER_USER: {
const char* msg5 = "invalid user rights";
const char* msg7 = "not support options";
const char* msg2 = "invalid user/account name";
const char* msg3 = "name too long";
const char* msg5 = "invalid user rights";
const char* msg7 = "not support options";
pCmd->command = pInfo->type;
......@@ -950,7 +951,7 @@ static bool validateTableColumnInfo(SArray* pFieldList, SSqlCmd* pCmd) {
return false;
}
if (pField->type < TSDB_DATA_TYPE_BOOL || pField->type > TSDB_DATA_TYPE_NCHAR) {
if (!isValidDataType(pField->type)) {
invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4);
return false;
}
......@@ -1011,7 +1012,7 @@ static bool validateTagParams(SArray* pTagsList, SArray* pFieldList, SSqlCmd* pC
return false;
}
if (p->type < TSDB_DATA_TYPE_BOOL || p->type > TSDB_DATA_TYPE_NCHAR) {
if (!isValidDataType(p->type)) {
invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5);
return false;
}
......@@ -1475,7 +1476,7 @@ static void addPrimaryTsColIntoResult(SQueryInfo* pQueryInfo) {
pQueryInfo->type |= TSDB_QUERY_TYPE_PROJECTION_QUERY;
}
int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSelection, bool isSTable, bool joinQuery) {
int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSelection, bool isSTable, bool joinQuery, bool intervalQuery) {
assert(pSelection != NULL && pCmd != NULL);
const char* msg2 = "functions can not be mixed up";
......@@ -1531,7 +1532,7 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel
addPrimaryTsColIntoResult(pQueryInfo);
}
if (!functionCompatibleCheck(pQueryInfo, joinQuery)) {
if (!functionCompatibleCheck(pQueryInfo, joinQuery, intervalQuery)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
......@@ -1754,7 +1755,7 @@ static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SS
// set reverse order scan data blocks for last query
if (functionID == TSDB_FUNC_LAST) {
pExpr->numOfParams = 1;
pExpr->param[0].i64Key = TSDB_ORDER_DESC;
pExpr->param[0].i64 = TSDB_ORDER_DESC;
pExpr->param[0].nType = TSDB_DATA_TYPE_INT;
}
......@@ -1945,7 +1946,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex);
int16_t colType = pSchema->type;
if (colType <= TSDB_DATA_TYPE_BOOL || colType >= TSDB_DATA_TYPE_BINARY) {
if (!IS_NUMERIC_TYPE(colType)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
......@@ -2105,7 +2106,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
if (!TSDB_COL_IS_TAG(pIndex->flag) && pIndex->colIndex < tscGetNumOfColumns(pTableMetaInfo->pTableMeta)) { // group by normal columns
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, colIndex + i);
pExpr->numOfParams = 1;
pExpr->param->i64Key = TSDB_ORDER_ASC;
pExpr->param->i64 = TSDB_ORDER_ASC;
break;
}
......@@ -2182,7 +2183,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
// 2. valid the column type
int16_t colType = pSchema[index.columnIndex].type;
if (colType == TSDB_DATA_TYPE_BOOL || colType >= TSDB_DATA_TYPE_BINARY) {
if (!IS_NUMERIC_TYPE(colType)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
......@@ -2727,7 +2728,7 @@ int32_t tscTansformSQLFuncForSTableQuery(SQueryInfo* pQueryInfo) {
if ((functionId >= TSDB_FUNC_SUM && functionId <= TSDB_FUNC_TWA) ||
(functionId >= TSDB_FUNC_FIRST_DST && functionId <= TSDB_FUNC_LAST_DST) ||
(functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_AVG_IRATE)) {
if (getResultDataInfo(pSrcSchema->type, pSrcSchema->bytes, functionId, (int32_t)pExpr->param[0].i64Key, &type, &bytes,
if (getResultDataInfo(pSrcSchema->type, pSrcSchema->bytes, functionId, (int32_t)pExpr->param[0].i64, &type, &bytes,
&interBytes, 0, true) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
}
......@@ -2810,7 +2811,7 @@ bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo)
return false;
}
static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery) {
static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery, bool intervalQuery) {
int32_t startIdx = 0;
size_t numOfExpr = tscSqlExprNumOfExprs(pQueryInfo);
......@@ -2826,6 +2827,10 @@ static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery) {
int32_t factor = functionCompatList[tscSqlExprGet(pQueryInfo, startIdx)->functionId];
if (tscSqlExprGet(pQueryInfo, 0)->functionId == TSDB_FUNC_LAST_ROW && (joinQuery || intervalQuery)) {
return false;
}
// diff function cannot be executed with other function
// arithmetic function can be executed with other arithmetic functions
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
......@@ -2850,7 +2855,7 @@ static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery) {
}
}
if (functionId == TSDB_FUNC_LAST_ROW && joinQuery) {
if (functionId == TSDB_FUNC_LAST_ROW && (joinQuery || intervalQuery)) {
return false;
}
}
......@@ -4392,8 +4397,8 @@ int32_t getTimeRange(STimeWindow* win, tSQLExpr* pRight, int32_t optr, int16_t t
*
* Additional check to avoid data overflow
*/
if (pRight->val.i64Key <= INT64_MAX / 1000) {
pRight->val.i64Key *= 1000;
if (pRight->val.i64 <= INT64_MAX / 1000) {
pRight->val.i64 *= 1000;
}
} else if (pRight->nSQLOptr == TK_FLOAT && timePrecision == TSDB_TIME_PRECISION_MILLI) {
pRight->val.dKey *= 1000;
......@@ -4594,8 +4599,8 @@ static void setDefaultOrderInfo(SQueryInfo* pQueryInfo) {
int32_t parseOrderbyClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql, SSchema* pSchema) {
const char* msg0 = "only support order by primary timestamp";
const char* msg1 = "invalid column name";
const char* msg2 = "only support order by primary timestamp or queried column";
const char* msg3 = "only support order by primary timestamp or first tag in groupby clause";
const char* msg2 = "only support order by primary timestamp or first tag in groupby clause allowed";
const char* msg3 = "invalid column in order by clause, only primary timestamp or first tag in groupby clause allowed";
setDefaultOrderInfo(pQueryInfo);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
......@@ -5275,8 +5280,7 @@ int32_t parseLimitClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseIn
const char* msg0 = "soffset/offset can not be less than 0";
const char* msg1 = "slimit/soffset only available for STable query";
const char* msg2 = "functions mixed up in table query";
const char* msg3 = "slimit/soffset can not apply to projection query";
const char* msg2 = "slimit/soffset can not apply to projection query";
// handle the limit offset value, validate the limit
pQueryInfo->limit = pQuerySql->limit;
......@@ -5301,7 +5305,7 @@ int32_t parseLimitClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseIn
if (!tscQueryTags(pQueryInfo)) { // local handle the super table tag query
if (tscIsProjectionQueryOnSTable(pQueryInfo, 0)) {
if (pQueryInfo->slimit.limit > 0 || pQueryInfo->slimit.offset > 0) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
// for projection query on super table, all queries are subqueries
......@@ -5359,24 +5363,6 @@ int32_t parseLimitClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseIn
if (pQueryInfo->slimit.limit != -1 || pQueryInfo->slimit.offset != 0) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
size_t size = taosArrayGetSize(pQueryInfo->exprList);
bool hasTags = false;
bool hasOtherFunc = false;
// filter the query functions operating on "tbname" column that are not supported by normal columns.
for (int32_t i = 0; i < size; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
if (TSDB_COL_IS_TAG(pExpr->colInfo.flag)) {
hasTags = true;
} else {
hasOtherFunc = true;
}
}
if (hasTags && hasOtherFunc) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
}
return TSDB_CODE_SUCCESS;
......@@ -5395,22 +5381,22 @@ static int32_t setKeepOption(SSqlCmd* pCmd, SCreateDbMsg* pMsg, SCreateDBInfo* p
tVariantListItem* p0 = taosArrayGet(pKeep, 0);
switch (s) {
case 1: {
pMsg->daysToKeep = htonl((int32_t)p0->pVar.i64Key);
pMsg->daysToKeep = htonl((int32_t)p0->pVar.i64);
}
break;
case 2: {
tVariantListItem* p1 = taosArrayGet(pKeep, 1);
pMsg->daysToKeep = htonl((int32_t)p0->pVar.i64Key);
pMsg->daysToKeep1 = htonl((int32_t)p1->pVar.i64Key);
pMsg->daysToKeep = htonl((int32_t)p0->pVar.i64);
pMsg->daysToKeep1 = htonl((int32_t)p1->pVar.i64);
break;
}
case 3: {
tVariantListItem* p1 = taosArrayGet(pKeep, 1);
tVariantListItem* p2 = taosArrayGet(pKeep, 2);
pMsg->daysToKeep = htonl((int32_t)p0->pVar.i64Key);
pMsg->daysToKeep1 = htonl((int32_t)p1->pVar.i64Key);
pMsg->daysToKeep2 = htonl((int32_t)p2->pVar.i64Key);
pMsg->daysToKeep = htonl((int32_t)p0->pVar.i64);
pMsg->daysToKeep1 = htonl((int32_t)p1->pVar.i64);
pMsg->daysToKeep2 = htonl((int32_t)p2->pVar.i64);
break;
}
default: { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg); }
......@@ -5576,7 +5562,7 @@ static void doUpdateSqlFunctionForTagPrj(SQueryInfo* pQueryInfo) {
if ((pExpr->functionId != TSDB_FUNC_TAG_DUMMY && pExpr->functionId != TSDB_FUNC_TS_DUMMY) &&
!(pExpr->functionId == TSDB_FUNC_PRJ && TSDB_COL_IS_UD_COL(pExpr->colInfo.flag))) {
SSchema* pColSchema = &pSchema[pExpr->colInfo.colIndex];
getResultDataInfo(pColSchema->type, pColSchema->bytes, pExpr->functionId, (int32_t)pExpr->param[0].i64Key, &pExpr->resType,
getResultDataInfo(pColSchema->type, pColSchema->bytes, pExpr->functionId, (int32_t)pExpr->param[0].i64, &pExpr->resType,
&pExpr->resBytes, &pExpr->interBytes, tagLength, isSTable);
}
}
......@@ -5848,14 +5834,43 @@ static int32_t doAddGroupbyColumnsOnDemand(SSqlCmd* pCmd, SQueryInfo* pQueryInfo
return TSDB_CODE_SUCCESS;
}
static int32_t doTagFunctionCheck(SQueryInfo* pQueryInfo) {
bool tagProjection = false;
bool tableCounting = false;
int32_t numOfCols = (int32_t) tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < numOfCols; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
int32_t functionId = pExpr->functionId;
if (functionId == TSDB_FUNC_TAGPRJ) {
tagProjection = true;
continue;
}
if (functionId == TSDB_FUNC_COUNT) {
assert(pExpr->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX);
tableCounting = true;
}
}
return (tableCounting && tagProjection)? -1:0;
}
int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
const char* msg1 = "functions/columns not allowed in group by query";
const char* msg2 = "projection query on columns not allowed";
const char* msg3 = "group by not allowed on projection query";
const char* msg4 = "retrieve tags not compatible with group by or interval query";
const char* msg5 = "functions can not be mixed up";
// only retrieve tags, group by is not supportted
if (tscQueryTags(pQueryInfo)) {
if (doTagFunctionCheck(pQueryInfo) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5);
}
if (pQueryInfo->groupbyExpr.numOfGroupCols > 0 || pQueryInfo->interval.interval > 0) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4);
} else {
......@@ -6278,10 +6293,12 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
const char* msg1 = "invalid table name";
const char* msg2 = "functions not allowed in CQ";
const char* msg3 = "fill only available for interval query";
const char* msg4 = "fill option not supported in stream computing";
const char* msg5 = "sql too long"; // todo ADD support
const char* msg6 = "from missing in subclause";
const char* msg7 = "time interval is required";
SSqlCmd* pCmd = &pSql->cmd;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
......@@ -6291,10 +6308,10 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
// if sql specifies db, use it, otherwise use default db
SStrToken* pzTableName = &(pCreateTable->name);
SStrToken* pName = &(pCreateTable->name);
SQuerySQL* pQuerySql = pCreateTable->pSelect;
if (tscValidateName(pzTableName) != TSDB_CODE_SUCCESS) {
if (tscValidateName(pName) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
......@@ -6320,7 +6337,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
}
bool isSTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
if (parseSelectClause(&pSql->cmd, 0, pQuerySql->pSelection, isSTable, false) != TSDB_CODE_SUCCESS) {
if (parseSelectClause(&pSql->cmd, 0, pQuerySql->pSelection, isSTable, false, false) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
}
......@@ -6333,15 +6350,19 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
// set interval value
if (parseIntervalClause(pSql, pQueryInfo, pQuerySql) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
} else {
}
if ((pQueryInfo->interval.interval > 0) &&
(validateFunctionsInIntervalOrGroupbyQuery(pCmd, pQueryInfo) != TSDB_CODE_SUCCESS)) {
return TSDB_CODE_TSC_INVALID_SQL;
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
if (!tscIsProjectionQuery(pQueryInfo) && pQueryInfo->interval.interval == 0) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg7);
}
// set the created table[stream] name
code = tscSetTableFullName(pTableMetaInfo, pzTableName, pSql);
code = tscSetTableFullName(pTableMetaInfo, pName, pSql);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -6565,7 +6586,9 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
int32_t joinQuery = (pQuerySql->from != NULL && taosArrayGetSize(pQuerySql->from) > 2);
if (parseSelectClause(pCmd, index, pQuerySql->pSelection, isSTable, joinQuery) != TSDB_CODE_SUCCESS) {
int32_t intervalQuery = !(pQuerySql->interval.type == 0 || pQuerySql->interval.n == 0);
if (parseSelectClause(pCmd, index, pQuerySql->pSelection, isSTable, joinQuery, intervalQuery) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
}
......@@ -6738,7 +6761,7 @@ int32_t exprTreeFromSqlExpr(SSqlCmd* pCmd, tExprNode **pExpr, const tSQLExpr* pS
// check for dividing by 0
if ((*pExpr)->_node.optr == TSDB_BINARY_OP_DIVIDE) {
if (pRight->nodeType == TSQL_NODE_VALUE) {
if (pRight->pVal->nType == TSDB_DATA_TYPE_INT && pRight->pVal->i64Key == 0) {
if (pRight->pVal->nType == TSDB_DATA_TYPE_INT && pRight->pVal->i64 == 0) {
return TSDB_CODE_TSC_INVALID_SQL;
} else if (pRight->pVal->nType == TSDB_DATA_TYPE_FLOAT && pRight->pVal->dKey == 0) {
return TSDB_CODE_TSC_INVALID_SQL;
......
......@@ -66,30 +66,24 @@ STableComInfo tscGetTableInfo(const STableMeta* pTableMeta) {
return pTableMeta->tableInfo;
}
bool isValidSchema(struct SSchema* pSchema, int32_t numOfCols) {
if (!VALIDNUMOFCOLS(numOfCols)) {
return false;
}
/* first column must be the timestamp, which is a primary key */
if (pSchema[0].type != TSDB_DATA_TYPE_TIMESTAMP) {
return false;
}
/* type is valid, length is valid */
static bool doValidateSchema(SSchema* pSchema, int32_t numOfCols, int32_t maxLen) {
int32_t rowLen = 0;
for (int32_t i = 0; i < numOfCols; ++i) {
// 1. valid types
if (pSchema[i].type > TSDB_DATA_TYPE_TIMESTAMP || pSchema[i].type < TSDB_DATA_TYPE_BOOL) {
if (!isValidDataType(pSchema[i].type)) {
return false;
}
// 2. valid length for each type
if (pSchema[i].type == TSDB_DATA_TYPE_TIMESTAMP) {
if (pSchema[i].type == TSDB_DATA_TYPE_BINARY) {
if (pSchema[i].bytes > TSDB_MAX_BINARY_LEN) {
return false;
}
} else if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR) {
if (pSchema[i].bytes > TSDB_MAX_NCHAR_LEN) {
return false;
}
} else {
if (pSchema[i].bytes != tDataTypeDesc[pSchema[i].type].nSize) {
return false;
......@@ -106,8 +100,32 @@ bool isValidSchema(struct SSchema* pSchema, int32_t numOfCols) {
rowLen += pSchema[i].bytes;
}
// valid total length
return (rowLen <= TSDB_MAX_BYTES_PER_ROW);
return rowLen <= maxLen;
}
bool isValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags) {
if (!VALIDNUMOFCOLS(numOfCols)) {
return false;
}
if (!VALIDNUMOFTAGS(numOfTags)) {
return false;
}
/* first column must be the timestamp, which is a primary key */
if (pSchema[0].type != TSDB_DATA_TYPE_TIMESTAMP) {
return false;
}
if (!doValidateSchema(pSchema, numOfCols, TSDB_MAX_BYTES_PER_ROW)) {
return false;
}
if (!doValidateSchema(&pSchema[numOfCols], numOfTags, TSDB_MAX_TAGS_LEN)) {
return false;
}
return true;
}
SSchema* tscGetTableColumnSchema(const STableMeta* pTableMeta, int32_t colIndex) {
......
......@@ -755,12 +755,10 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SColumn *pCol = taosArrayGetP(pQueryInfo->colList, i);
SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];
if (pCol->colIndex.columnIndex >= tscGetNumOfColumns(pTableMeta) || pColSchema->type < TSDB_DATA_TYPE_BOOL ||
pColSchema->type > TSDB_DATA_TYPE_NCHAR) {
if (pCol->colIndex.columnIndex >= tscGetNumOfColumns(pTableMeta) || !isValidDataType(pColSchema->type)) {
tscError("%p tid:%d uid:%" PRIu64" id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s",
pSql, pTableMeta->id.tid, pTableMeta->id.uid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex.columnIndex,
pColSchema->name);
return TSDB_CODE_TSC_INVALID_SQL;
}
......@@ -826,7 +824,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen);
pMsg += pExpr->param[j].nLen;
} else {
pSqlFuncExpr->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64Key);
pSqlFuncExpr->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64);
}
}
......@@ -866,7 +864,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen);
pMsg += pExpr->param[j].nLen;
} else {
pSqlFuncExpr1->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64Key);
pSqlFuncExpr1->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64);
}
}
......@@ -944,7 +942,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];
if ((pCol->colIndex.columnIndex >= numOfTagColumns || pCol->colIndex.columnIndex < -1) ||
(pColSchema->type < TSDB_DATA_TYPE_BOOL || pColSchema->type > TSDB_DATA_TYPE_NCHAR)) {
(!isValidDataType(pColSchema->type))) {
tscError("%p tid:%d uid:%" PRIu64 " id:%s, tag index out of range, totalCols:%d, numOfTags:%d, index:%d, column name:%s",
pSql, pTableMeta->id.tid, pTableMeta->id.uid, pTableMetaInfo->name, total, numOfTagColumns,
pCol->colIndex.columnIndex, pColSchema->name);
......@@ -1830,7 +1828,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
assert(i == 0);
}
assert(pSchema->type >= TSDB_DATA_TYPE_BOOL && pSchema->type <= TSDB_DATA_TYPE_NCHAR);
assert(isValidDataType(pSchema->type));
pSchema++;
}
......
......@@ -327,7 +327,7 @@ TAOS_RES* taos_query_c(TAOS *taos, const char *sqlstr, uint32_t sqlLen, int64_t*
return NULL;
}
nPrintTsc(sqlstr);
nPrintTsc("%s", sqlstr);
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
if (pSql == NULL) {
......@@ -788,18 +788,34 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
len += sprintf(str + len, "%d", *((int8_t *)row[i]));
break;
case TSDB_DATA_TYPE_UTINYINT:
len += sprintf(str + len, "%u", *((uint8_t *)row[i]));
break;
case TSDB_DATA_TYPE_SMALLINT:
len += sprintf(str + len, "%d", *((int16_t *)row[i]));
break;
case TSDB_DATA_TYPE_USMALLINT:
len += sprintf(str + len, "%u", *((uint16_t *)row[i]));
break;
case TSDB_DATA_TYPE_INT:
len += sprintf(str + len, "%d", *((int32_t *)row[i]));
break;
case TSDB_DATA_TYPE_UINT:
len += sprintf(str + len, "%u", *((uint32_t *)row[i]));
break;
case TSDB_DATA_TYPE_BIGINT:
len += sprintf(str + len, "%" PRId64, *((int64_t *)row[i]));
break;
case TSDB_DATA_TYPE_UBIGINT:
len += sprintf(str + len, "%" PRIu64, *((uint64_t *)row[i]));
break;
case TSDB_DATA_TYPE_FLOAT: {
float fv = 0;
fv = GET_FLOAT_VAL(row[i]);
......
......@@ -65,44 +65,67 @@ static int64_t tscGetRetryDelayTime(SSqlStream* pStream, int64_t slidingTime, in
return retryDelta;
}
static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) {
SSqlStream *pStream = (SSqlStream *)pMsg->ahandle;
SSqlObj * pSql = pStream->pSql;
static void setRetryInfo(SSqlStream* pStream, int32_t code) {
SSqlObj* pSql = pStream->pSql;
pSql->fp = tscProcessStreamQueryCallback;
pSql->fetchFp = tscProcessStreamQueryCallback;
pSql->param = pStream;
pSql->res.code = code;
int64_t retryDelayTime = tscGetRetryDelayTime(pStream, pStream->interval.sliding, pStream->precision);
tscDebug("%p stream:%p, get table Meta failed, retry in %" PRId64 "ms", pSql, pStream, retryDelayTime);
tscSetRetryTimer(pStream, pSql, retryDelayTime);
}
static void doLaunchQuery(void* param, TAOS_RES* tres, int32_t code) {
SSqlStream *pStream = (SSqlStream *)param;
assert(pStream->pSql == tres);
SSqlObj* pSql = (SSqlObj*) tres;
pSql->fp = doLaunchQuery;
pSql->fetchFp = doLaunchQuery;
pSql->res.completed = false;
if (code != TSDB_CODE_SUCCESS) {
setRetryInfo(pStream, code);
return;
}
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
int code = tscGetTableMeta(pSql, pTableMetaInfo);
pSql->res.code = code;
code = tscGetTableMeta(pSql, pTableMetaInfo);
if (code == 0 && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
code = tscGetSTableVgroupInfo(pSql, 0);
pSql->res.code = code;
}
// failed to get meter/metric meta, retry in 10sec.
if (code != TSDB_CODE_SUCCESS) {
int64_t retryDelayTime = tscGetRetryDelayTime(pStream, pStream->interval.sliding, pStream->precision);
tscDebug("%p stream:%p,get metermeta failed, retry in %" PRId64 "ms", pStream->pSql, pStream, retryDelayTime);
tscSetRetryTimer(pStream, pSql, retryDelayTime);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
return;
}
} else {
// failed to get table Meta or vgroup list, retry in 10sec.
if (code == TSDB_CODE_SUCCESS) {
tscTansformSQLFuncForSTableQuery(pQueryInfo);
tscDebug("%p stream:%p start stream query on:%s", pSql, pStream, pTableMetaInfo->name);
tscDoQuery(pStream->pSql);
tscDebug("%p stream:%p, start stream query on:%s", pSql, pStream, pTableMetaInfo->name);
pSql->fp = tscProcessStreamQueryCallback;
pSql->fetchFp = tscProcessStreamQueryCallback;
tscDoQuery(pSql);
tscIncStreamExecutionCount(pStream);
} else {
setRetryInfo(pStream, code);
}
}
static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) {
SSqlStream *pStream = (SSqlStream *)pMsg->ahandle;
doLaunchQuery(pStream, pStream->pSql, 0);
}
static void tscProcessStreamTimer(void *handle, void *tmrId) {
SSqlStream *pStream = (SSqlStream *)handle;
if (pStream == NULL) return;
if (pStream->pTimer != tmrId) return;
if (pStream == NULL || pStream->pTimer != tmrId) {
return;
}
pStream->pTimer = NULL;
pStream->numOfRes = 0; // reset the numOfRes.
......@@ -392,12 +415,17 @@ static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) {
tscSetRetryTimer(pStream, pSql, timer);
}
static void tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
static int32_t tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
int64_t minIntervalTime =
(pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMinIntervalTime * 1000L : tsMinIntervalTime;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
if (!pStream->isProject && pQueryInfo->interval.interval == 0) {
sprintf(pSql->cmd.payload, "the interval value is 0");
return -1;
}
if (pQueryInfo->interval.intervalUnit != 'n' && pQueryInfo->interval.intervalUnit!= 'y' && pQueryInfo->interval.interval < minIntervalTime) {
tscWarn("%p stream:%p, original sample interval:%" PRId64 " too small, reset to:%" PRId64, pSql, pStream,
(int64_t)pQueryInfo->interval.interval, minIntervalTime);
......@@ -436,6 +464,8 @@ static void tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
pQueryInfo->interval.interval = 0; // clear the interval value to avoid the force time window split by query processor
pQueryInfo->interval.sliding = 0;
}
return TSDB_CODE_SUCCESS;
}
static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, int64_t stime) {
......@@ -485,34 +515,19 @@ static int64_t tscGetLaunchTimestamp(const SSqlStream *pStream) {
return (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? timer / 1000L : timer;
}
static void setErrorInfo(SSqlObj* pSql, int32_t code, char* info) {
if (pSql == NULL) {
return;
}
SSqlCmd* pCmd = &pSql->cmd;
pSql->res.code = code;
if (info != NULL) {
strncpy(pCmd->payload, info, pCmd->payloadLen);
}
}
static void tscCreateStream(void *param, TAOS_RES *res, int code) {
SSqlStream* pStream = (SSqlStream*)param;
SSqlObj* pSql = pStream->pSql;
SSqlCmd* pCmd = &pSql->cmd;
if (code != TSDB_CODE_SUCCESS) {
setErrorInfo(pSql, code, pCmd->payload);
tscError("%p open stream failed, sql:%s, reason:%s, code:0x%08x", pSql, pSql->sqlstr, pCmd->payload, code);
pSql->res.code = code;
tscError("%p open stream failed, sql:%s, reason:%s, code:%s", pSql, pSql->sqlstr, pCmd->payload, tstrerror(code));
pStream->fp(pStream->param, NULL, NULL);
return;
}
registerSqlObj(pSql);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
......@@ -523,13 +538,22 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
pStream->ctime = taosGetTimestamp(pStream->precision);
pStream->etime = pQueryInfo->window.ekey;
tscAddIntoStreamList(pStream);
if (tscSetSlidingWindowInfo(pSql, pStream) != TSDB_CODE_SUCCESS) {
pSql->res.code = code;
tscError("%p stream %p open failed, since the interval value is incorrect", pSql, pStream);
pStream->fp(pStream->param, NULL, NULL);
return;
}
tscSetSlidingWindowInfo(pSql, pStream);
pStream->stime = tscGetStreamStartTimestamp(pSql, pStream, pStream->stime);
int64_t starttime = tscGetLaunchTimestamp(pStream);
pCmd->command = TSDB_SQL_SELECT;
registerSqlObj(pSql);
tscAddIntoStreamList(pStream);
taosTmrReset(tscProcessStreamTimer, (int32_t)starttime, pStream, tscTmr, &pStream->pTimer);
tscDebug("%p stream:%p is opened, query on:%s, interval:%" PRId64 ", sliding:%" PRId64 ", first launched in:%" PRId64 ", sql:%s", pSql,
......
......@@ -188,7 +188,9 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ
tsBufFlush(output2);
tsBufDestroy(pSupporter1->pTSBuf);
pSupporter1->pTSBuf = NULL;
tsBufDestroy(pSupporter2->pTSBuf);
pSupporter2->pTSBuf = NULL;
TSKEY et = taosGetTimestampUs();
tscDebug("%p input1:%" PRId64 ", input2:%" PRId64 ", final:%" PRId64 " in %d vnodes for secondary query after ts blocks "
......@@ -219,12 +221,9 @@ SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, int32_t index) {
assert (pSupporter->uid != 0);
taosGetTmpfilePath("join-", pSupporter->path);
pSupporter->f = fopen(pSupporter->path, "w");
// todo handle error
if (pSupporter->f == NULL) {
tscError("%p failed to create tmp file:%s, reason:%s", pSql, pSupporter->path, strerror(errno));
}
// do NOT create file here to reduce crash generated file left issue
pSupporter->f = NULL;
return pSupporter;
}
......@@ -244,12 +243,19 @@ static void tscDestroyJoinSupporter(SJoinSupporter* pSupporter) {
tscFieldInfoClear(&pSupporter->fieldsInfo);
if (pSupporter->pTSBuf != NULL) {
tsBufDestroy(pSupporter->pTSBuf);
pSupporter->pTSBuf = NULL;
}
unlink(pSupporter->path);
if (pSupporter->f != NULL) {
fclose(pSupporter->f);
unlink(pSupporter->path);
pSupporter->f = NULL;
}
if (pSupporter->pVgroupTables != NULL) {
taosArrayDestroy(pSupporter->pVgroupTables);
pSupporter->pVgroupTables = NULL;
......@@ -459,7 +465,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
int16_t colId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid);
// set the tag column id for executor to extract correct tag value
pExpr->param[0] = (tVariant) {.i64Key = colId, .nType = TSDB_DATA_TYPE_BIGINT, .nLen = sizeof(int64_t)};
pExpr->param[0] = (tVariant) {.i64 = colId, .nType = TSDB_DATA_TYPE_BIGINT, .nLen = sizeof(int64_t)};
pExpr->numOfParams = 1;
}
......@@ -526,6 +532,8 @@ static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSupporter* pSupporter) {
tscError("%p all subquery return and query failed, global code:%s", pSqlObj, tstrerror(pSqlObj->res.code));
freeJoinSubqueryObj(pSqlObj);
}
tscDestroyJoinSupporter(pSupporter);
}
// update the query time range according to the join results on timestamp
......@@ -642,7 +650,7 @@ static void issueTSCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj*
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, 0);
int16_t tagColId = tscGetJoinTagColIdByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->id.uid);
pExpr->param->i64Key = tagColId;
pExpr->param->i64 = tagColId;
pExpr->numOfParams = 1;
}
......@@ -921,6 +929,22 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
}
if (numOfRows > 0) { // write the compressed timestamp to disk file
if(pSupporter->f == NULL) {
pSupporter->f = fopen(pSupporter->path, "w");
if (pSupporter->f == NULL) {
tscError("%p failed to create tmp file:%s, reason:%s", pSql, pSupporter->path, strerror(errno));
pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
quitAllSubquery(pParentSql, pSupporter);
tscAsyncResultOnError(pParentSql);
return;
}
}
fwrite(pRes->data, (size_t)pRes->numOfRows, 1, pSupporter->f);
fclose(pSupporter->f);
pSupporter->f = NULL;
......@@ -930,6 +954,9 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
tscError("%p invalid ts comp file from vnode, abort subquery, file size:%d", pSql, numOfRows);
pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
quitAllSubquery(pParentSql, pSupporter);
tscAsyncResultOnError(pParentSql);
return;
......@@ -1520,7 +1547,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
int16_t tagColId = tscGetJoinTagColIdByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->id.uid);
pExpr->param->i64Key = tagColId;
pExpr->param->i64 = tagColId;
pExpr->numOfParams = 1;
}
......@@ -2093,7 +2120,15 @@ static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsuppo
return pNew;
}
// todo there is are race condition in this function, while cancel is called by user.
void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
// the param may be null, since it may be done by other query threads. and the asyncOnError may enter in this
// function while kill query by a user.
if (param == NULL) {
assert(code != TSDB_CODE_SUCCESS);
return;
}
SRetrieveSupport *trsupport = (SRetrieveSupport *) param;
SSqlObj* pParentSql = trsupport->pParentSql;
......
......@@ -71,6 +71,7 @@ void tsSetSTableQueryCond(STagCond* pTagCond, uint64_t uid, SBufferWriter* bw) {
bool tscQueryTags(SQueryInfo* pQueryInfo) {
int32_t numOfCols = (int32_t) tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < numOfCols; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
int32_t functId = pExpr->functionId;
......@@ -314,7 +315,7 @@ void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
} else {
for (int32_t k = 0; k < pRes->numOfRows; ++k) {
char* p = ((char**)pRes->urow)[i] + k * pInfo->field.bytes;
memcpy(p, &pInfo->pSqlExpr->param[1].i64Key, pInfo->field.bytes);
memcpy(p, &pInfo->pSqlExpr->param[1].i64, pInfo->field.bytes);
}
}
}
......@@ -458,12 +459,13 @@ void tscFreeRegisteredSqlObj(void *pSql) {
assert(RID_VALID(p->self));
tscFreeSqlObj(p);
taosReleaseRef(tscRefId, pTscObj->rid);
int32_t num = atomic_sub_fetch_32(&pTscObj->numOfObj, 1);
int32_t total = atomic_sub_fetch_32(&tscNumOfObj, 1);
tscDebug("%p free SqlObj, total in tscObj:%d, total:%d", pSql, num, total);
tscFreeSqlObj(p);
taosReleaseRef(tscRefId, pTscObj->rid);
}
void tscFreeSqlObj(SSqlObj* pSql) {
......
......@@ -36,6 +36,7 @@ extern int8_t tsEnableVnodeBak;
extern int8_t tsEnableTelemetryReporting;
extern char tsEmail[];
extern char tsArbitrator[];
extern int8_t tsArbOnline;
// common
extern int tsRpcTimer;
......@@ -190,7 +191,7 @@ extern int32_t monDebugFlag;
extern int32_t uDebugFlag;
extern int32_t rpcDebugFlag;
extern int32_t odbcDebugFlag;
extern int32_t qDebugFlag;
extern uint32_t qDebugFlag;
extern int32_t wDebugFlag;
extern int32_t cqDebugFlag;
extern int32_t debugFlag;
......
......@@ -28,7 +28,8 @@ typedef struct tVariant {
uint32_t nType;
int32_t nLen; // only used for string, for number, it is useless
union {
int64_t i64Key;
int64_t i64;
uint64_t u64;
double dKey;
char * pz;
wchar_t *wpz;
......@@ -36,9 +37,9 @@ typedef struct tVariant {
};
} tVariant;
void tVariantCreate(tVariant *pVar, SStrToken *token);
bool tVariantIsValid(tVariant *pVar);
void tVariantCreateFromString(tVariant *pVar, char *pz, uint32_t len, uint32_t type);
void tVariantCreate(tVariant *pVar, SStrToken *token);
void tVariantCreateFromBinary(tVariant *pVar, const char *pz, size_t len, uint32_t type);
......
......@@ -41,6 +41,7 @@ int32_t tsStatusInterval = 1; // second
int32_t tsNumOfMnodes = 3;
int8_t tsEnableVnodeBak = 1;
int8_t tsEnableTelemetryReporting = 1;
int8_t tsArbOnline = 0;
char tsEmail[TSDB_FQDN_LEN] = {0};
// common
......@@ -218,7 +219,7 @@ int32_t odbcDebugFlag = 131;
int32_t httpDebugFlag = 131;
int32_t mqttDebugFlag = 131;
int32_t monDebugFlag = 131;
int32_t qDebugFlag = 131;
uint32_t qDebugFlag = 131;
int32_t rpcDebugFlag = 131;
int32_t uDebugFlag = 131;
int32_t debugFlag = 0;
......
此差异已折叠。
此差异已折叠。
......@@ -19,7 +19,8 @@ using System.Runtime.InteropServices;
namespace TDengineDriver
{
enum TDengineDataType {
enum TDengineDataType
{
TSDB_DATA_TYPE_NULL = 0, // 1 bytes
TSDB_DATA_TYPE_BOOL = 1, // 1 bytes
TSDB_DATA_TYPE_TINYINT = 2, // 1 bytes
......@@ -81,19 +82,19 @@ namespace TDengineDriver
{
public const int TSDB_CODE_SUCCESS = 0;
[DllImport("taos.dll", EntryPoint = "taos_init", CallingConvention = CallingConvention.Cdecl)]
[DllImport("taos", EntryPoint = "taos_init", CallingConvention = CallingConvention.Cdecl)]
static extern public void Init();
[DllImport("taos.dll", EntryPoint = "taos_cleanup", CallingConvention = CallingConvention.Cdecl)]
[DllImport("taos", EntryPoint = "taos_cleanup", CallingConvention = CallingConvention.Cdecl)]
static extern public void Cleanup();
[DllImport("taos.dll", EntryPoint = "taos_options", CallingConvention = CallingConvention.Cdecl)]
[DllImport("taos", EntryPoint = "taos_options", CallingConvention = CallingConvention.Cdecl)]
static extern public void Options(int option, string value);
[DllImport("taos.dll", EntryPoint = "taos_connect", CallingConvention = CallingConvention.Cdecl)]
[DllImport("taos", EntryPoint = "taos_connect", CallingConvention = CallingConvention.Cdecl)]
static extern public IntPtr Connect(string ip, string user, string password, string db, short port);
[DllImport("taos.dll", EntryPoint = "taos_errstr", CallingConvention = CallingConvention.Cdecl)]
[DllImport("taos", EntryPoint = "taos_errstr", CallingConvention = CallingConvention.Cdecl)]
static extern private IntPtr taos_errstr(IntPtr res);
static public string Error(IntPtr res)
{
......@@ -101,19 +102,19 @@ namespace TDengineDriver
return Marshal.PtrToStringAnsi(errPtr);
}
[DllImport("taos.dll", EntryPoint = "taos_errno", CallingConvention = CallingConvention.Cdecl)]
[DllImport("taos", EntryPoint = "taos_errno", CallingConvention = CallingConvention.Cdecl)]
static extern public int ErrorNo(IntPtr res);
[DllImport("taos.dll", EntryPoint = "taos_query", CallingConvention = CallingConvention.Cdecl)]
[DllImport("taos", EntryPoint = "taos_query", CallingConvention = CallingConvention.Cdecl)]
static extern public IntPtr Query(IntPtr conn, string sqlstr);
[DllImport("taos.dll", EntryPoint = "taos_affected_rows", CallingConvention = CallingConvention.Cdecl)]
[DllImport("taos", EntryPoint = "taos_affected_rows", CallingConvention = CallingConvention.Cdecl)]
static extern public int AffectRows(IntPtr res);
[DllImport("taos.dll", EntryPoint = "taos_field_count", CallingConvention = CallingConvention.Cdecl)]
[DllImport("taos", EntryPoint = "taos_field_count", CallingConvention = CallingConvention.Cdecl)]
static extern public int FieldCount(IntPtr res);
[DllImport("taos.dll", EntryPoint = "taos_fetch_fields", CallingConvention = CallingConvention.Cdecl)]
[DllImport("taos", EntryPoint = "taos_fetch_fields", CallingConvention = CallingConvention.Cdecl)]
static extern private IntPtr taos_fetch_fields(IntPtr res);
static public List<TDengineMeta> FetchFields(IntPtr res)
{
......@@ -142,13 +143,13 @@ namespace TDengineDriver
return metas;
}
[DllImport("taos.dll", EntryPoint = "taos_fetch_row", CallingConvention = CallingConvention.Cdecl)]
[DllImport("taos", EntryPoint = "taos_fetch_row", CallingConvention = CallingConvention.Cdecl)]
static extern public IntPtr FetchRows(IntPtr res);
[DllImport("taos.dll", EntryPoint = "taos_free_result", CallingConvention = CallingConvention.Cdecl)]
[DllImport("taos", EntryPoint = "taos_free_result", CallingConvention = CallingConvention.Cdecl)]
static extern public IntPtr FreeResult(IntPtr res);
[DllImport("taos.dll", EntryPoint = "taos_close", CallingConvention = CallingConvention.Cdecl)]
[DllImport("taos", EntryPoint = "taos_close", CallingConvention = CallingConvention.Cdecl)]
static extern public int Close(IntPtr taos);
}
}
......@@ -56,6 +56,12 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<!-- for restful -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
......@@ -73,7 +79,14 @@
<version>1.2.58</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
......
......@@ -46,6 +46,9 @@ public abstract class TSDBConstants {
public static final int TSDB_DATA_TYPE_TIMESTAMP = 9;
public static final int TSDB_DATA_TYPE_NCHAR = 10;
// nchar field's max length
public static final int maxFieldSize = 16 * 1024;
public static String WrapErrMsg(String msg) {
return "TDengine Error: " + msg;
}
......
package com.taosdata.jdbc.rs;
import com.taosdata.jdbc.TSDBConstants;
import com.taosdata.jdbc.TSDBDriver;
import java.sql.*;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
public class RestfulConnection implements Connection {
private static final String CONNECTION_IS_CLOSED = "connection is closed.";
private static final String AUTO_COMMIT_IS_TRUE = "auto commit is true";
private final String host;
private final int port;
private final Properties props;
private final String database;
private volatile String database;
private final String url;
/******************************************************/
private boolean isClosed;
private DatabaseMetaData metadata;
private Map<String, Class<?>> typeMap;
private Properties clientInfoProps = new Properties();
public RestfulConnection(String host, String port, Properties props, String database, String url) {
this.host = host;
......@@ -21,280 +31,424 @@ public class RestfulConnection implements Connection {
this.props = props;
this.database = database;
this.url = url;
this.metadata = new RestfulDatabaseMetaData(url, props.getProperty(TSDBDriver.PROPERTY_KEY_USER), this);
}
@Override
public Statement createStatement() throws SQLException {
if (isClosed())
throw new SQLException(TSDBConstants.WrapErrMsg("restful TDengine connection is closed."));
throw new SQLException(CONNECTION_IS_CLOSED);
return new RestfulStatement(this, database);
}
@Override
public PreparedStatement prepareStatement(String sql) throws SQLException {
return null;
if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
//TODO: prepareStatement
throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
}
@Override
public CallableStatement prepareCall(String sql) throws SQLException {
return null;
if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
}
@Override
public String nativeSQL(String sql) throws SQLException {
return null;
if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
//nothing did
return sql;
}
@Override
public void setAutoCommit(boolean autoCommit) throws SQLException {
if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
if (!autoCommit)
throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
}
@Override
public boolean getAutoCommit() throws SQLException {
return false;
if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
return true;
}
@Override
public void commit() throws SQLException {
if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
if (getAutoCommit())
throw new SQLException(AUTO_COMMIT_IS_TRUE);
//nothing to do
}
@Override
public void rollback() throws SQLException {
if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
if (getAutoCommit())
throw new SQLException(AUTO_COMMIT_IS_TRUE);
//nothing to do
}
@Override
public void close() throws SQLException {
if (isClosed)
return;
//TODO: release all resources
isClosed = true;
}
@Override
public boolean isClosed() throws SQLException {
return false;
return isClosed;
}
@Override
public DatabaseMetaData getMetaData() throws SQLException {
//TODO: RestfulDatabaseMetaData is not implemented
return new RestfulDatabaseMetaData();
if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
return this.metadata;
}
@Override
public void setReadOnly(boolean readOnly) throws SQLException {
if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
// nothing to do
}
@Override
public boolean isReadOnly() throws SQLException {
return false;
if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
return true;
}
@Override
public void setCatalog(String catalog) throws SQLException {
if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
synchronized (RestfulConnection.class) {
this.database = catalog;
}
}
@Override
public String getCatalog() throws SQLException {
return null;
if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
return this.database;
}
@Override
public void setTransactionIsolation(int level) throws SQLException {
//transaction is not supported
throw new SQLFeatureNotSupportedException("transactions are not supported");
if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
switch (level) {
case Connection.TRANSACTION_NONE:
break;
case Connection.TRANSACTION_READ_UNCOMMITTED:
case Connection.TRANSACTION_READ_COMMITTED:
case Connection.TRANSACTION_REPEATABLE_READ:
case Connection.TRANSACTION_SERIALIZABLE:
throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
default:
throw new SQLException(TSDBConstants.INVALID_VARIABLES);
}
}
/**
*
*/
@Override
public int getTransactionIsolation() throws SQLException {
if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
//Connection.TRANSACTION_NONE specifies that transactions are not supported.
return Connection.TRANSACTION_NONE;
}
@Override
public SQLWarning getWarnings() throws SQLException {
//TODO: getWarnings not implemented
if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
return null;
}
@Override
public void clearWarnings() throws SQLException {
throw new SQLFeatureNotSupportedException("clearWarnings not supported.");
if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
//nothing to do
}
@Override
public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
return null;
if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
if (resultSetType != ResultSet.TYPE_FORWARD_ONLY) {
throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
}
if (resultSetConcurrency != ResultSet.CONCUR_READ_ONLY)
throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
return createStatement();
}
@Override
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
return null;
if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
if (resultSetType != ResultSet.TYPE_FORWARD_ONLY || resultSetConcurrency != ResultSet.CONCUR_READ_ONLY)
throw new SQLFeatureNotSupportedException(TSDBConstants.INVALID_VARIABLES);
return this.prepareStatement(sql);
}
@Override
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
return null;
if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
if (resultSetType != ResultSet.TYPE_FORWARD_ONLY || resultSetConcurrency != ResultSet.CONCUR_READ_ONLY)
throw new SQLFeatureNotSupportedException(TSDBConstants.INVALID_VARIABLES);
throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
}
@Override
public Map<String, Class<?>> getTypeMap() throws SQLException {
return null;
if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
synchronized (RestfulConnection.class) {
if (this.typeMap == null) {
this.typeMap = new HashMap<>();
}
return this.typeMap;
}
}
@Override
public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
synchronized (RestfulConnection.class) {
this.typeMap = map;
}
}
@Override
public void setHoldability(int holdability) throws SQLException {
if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
if (holdability != ResultSet.HOLD_CURSORS_OVER_COMMIT)
throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
}
@Override
public int getHoldability() throws SQLException {
return 0;
if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
return ResultSet.HOLD_CURSORS_OVER_COMMIT;
}
@Override
public Savepoint setSavepoint() throws SQLException {
return null;
if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
if (getAutoCommit())
throw new SQLException(TSDBConstants.INVALID_VARIABLES);
//nothing to do
throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
}
@Override
public Savepoint setSavepoint(String name) throws SQLException {
return null;
if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
if (getAutoCommit())
throw new SQLException(TSDBConstants.INVALID_VARIABLES);
//nothing to do
throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
}
@Override
public void rollback(Savepoint savepoint) throws SQLException {
if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
if (getAutoCommit())
throw new SQLException(TSDBConstants.INVALID_VARIABLES);
//nothing to do
throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
}
@Override
public void releaseSavepoint(Savepoint savepoint) throws SQLException {
if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
}
@Override
public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
return null;
if (resultSetHoldability != ResultSet.HOLD_CURSORS_OVER_COMMIT)
throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
return createStatement(resultSetType, resultSetConcurrency);
}
@Override
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
return null;
if (resultSetHoldability != ResultSet.HOLD_CURSORS_OVER_COMMIT)
throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
return prepareStatement(sql, resultSetType, resultSetConcurrency);
}
@Override
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
return null;
throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
}
@Override
public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
return null;
throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
}
@Override
public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
return null;
throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
}
@Override
public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
return null;
throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
}
@Override
public Clob createClob() throws SQLException {
//TODO: not supported
throw new SQLFeatureNotSupportedException();
throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
}
@Override
public Blob createBlob() throws SQLException {
//TODO: not supported
throw new SQLFeatureNotSupportedException();
throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
}
@Override
public NClob createNClob() throws SQLException {
//TODO: not supported
throw new SQLFeatureNotSupportedException();
throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
}
@Override
public SQLXML createSQLXML() throws SQLException {
//TODO: not supported
throw new SQLFeatureNotSupportedException();
throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
}
@Override
public boolean isValid(int timeout) throws SQLException {
return false;
if (timeout < 0)
throw new SQLException(TSDBConstants.INVALID_VARIABLES);
// TODO:
/* The driver shall submit a query on the connection or use some other mechanism that positively verifies
the connection is still valid when this method is called.*/
return !isClosed();
}
@Override
public void setClientInfo(String name, String value) throws SQLClientInfoException {
if (isClosed)
throw new SQLClientInfoException();
clientInfoProps.setProperty(name, value);
}
@Override
public void setClientInfo(Properties properties) throws SQLClientInfoException {
if (isClosed)
throw new SQLClientInfoException();
for (Enumeration<Object> enumer = properties.keys(); enumer.hasMoreElements(); ) {
String name = (String) enumer.nextElement();
clientInfoProps.put(name, properties.getProperty(name));
}
}
@Override
public String getClientInfo(String name) throws SQLException {
return null;
if (isClosed)
throw new SQLClientInfoException();
return clientInfoProps.getProperty(name);
}
@Override
public Properties getClientInfo() throws SQLException {
return null;
if (isClosed)
throw new SQLClientInfoException();
return clientInfoProps;
}
@Override
public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
//TODO: not supported
throw new SQLFeatureNotSupportedException();
throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
}
@Override
public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
//TODO: not supported
throw new SQLFeatureNotSupportedException();
throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
}
@Override
public void setSchema(String schema) throws SQLException {
if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
synchronized (RestfulConnection.class) {
this.database = schema;
}
}
@Override
public String getSchema() throws SQLException {
return null;
if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
return this.database;
}
@Override
public void abort(Executor executor) throws SQLException {
if (executor == null) {
throw new SQLException("Executor can not be null");
}
executor.execute(() -> {
try {
close();
} catch (SQLException e) {
e.printStackTrace();
}
});
}
@Override
public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
}
@Override
public int getNetworkTimeout() throws SQLException {
if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
return 0;
}
......
......@@ -33,7 +33,7 @@ public class RestfulDriver extends AbstractTaosDriver {
return null;
Properties props = parseURL(url, info);
String host = props.getProperty(TSDBDriver.PROPERTY_KEY_HOST, "localhost");
String host = props.getProperty(TSDBDriver.PROPERTY_KEY_HOST);
String port = props.getProperty(TSDBDriver.PROPERTY_KEY_PORT, "6041");
String database = props.containsKey(TSDBDriver.PROPERTY_KEY_DBNAME) ? props.getProperty(TSDBDriver.PROPERTY_KEY_DBNAME) : null;
......
......@@ -2,13 +2,15 @@ package com.taosdata.jdbc.rs;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.List;
import java.util.ArrayList;
public class RestfulResultSetMetaData implements ResultSetMetaData {
private List<String> fields;
private final String database;
private ArrayList<RestfulResultSet.Field> fields;
public RestfulResultSetMetaData(List<String> fields) {
public RestfulResultSetMetaData(String database, ArrayList<RestfulResultSet.Field> fields) {
this.database = database;
this.fields = fields;
}
......@@ -24,6 +26,7 @@ public class RestfulResultSetMetaData implements ResultSetMetaData {
@Override
public boolean isCaseSensitive(int column) throws SQLException {
//TODO
return false;
}
......@@ -39,7 +42,7 @@ public class RestfulResultSetMetaData implements ResultSetMetaData {
@Override
public int isNullable(int column) throws SQLException {
return 0;
return ResultSetMetaData.columnNullable;
}
@Override
......@@ -54,7 +57,7 @@ public class RestfulResultSetMetaData implements ResultSetMetaData {
@Override
public String getColumnLabel(int column) throws SQLException {
return fields.get(column - 1);
return fields.get(column - 1).name;
}
@Override
......@@ -64,7 +67,7 @@ public class RestfulResultSetMetaData implements ResultSetMetaData {
@Override
public String getSchemaName(int column) throws SQLException {
return null;
return this.database;
}
@Override
......@@ -84,7 +87,7 @@ public class RestfulResultSetMetaData implements ResultSetMetaData {
@Override
public String getCatalogName(int column) throws SQLException {
return null;
return this.database;
}
@Override
......
......@@ -17,6 +17,8 @@ import org.apache.http.protocol.HTTP;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;
import java.nio.charset.Charset;
public class HttpClientPoolUtil {
public static PoolingHttpClientConnectionManager cm = null;
......@@ -94,7 +96,9 @@ public class HttpClientPoolUtil {
initPools();
}
method = (HttpEntityEnclosingRequestBase) getRequest(uri, HttpPost.METHOD_NAME, DEFAULT_CONTENT_TYPE, 0);
method.setEntity(new StringEntity(data));
method.setHeader("Authorization", "Basic cm9vdDp0YW9zZGF0YQ==");
method.setHeader("Content-Type", "text/plain");
method.setEntity(new StringEntity(data, Charset.forName("UTF-8")));
HttpContext context = HttpClientContext.create();
CloseableHttpResponse httpResponse = httpClient.execute(method, context);
httpEntity = httpResponse.getEntity();
......@@ -105,26 +109,13 @@ public class HttpClientPoolUtil {
if (method != null) {
method.abort();
}
// e.printStackTrace();
// logger.error("execute post request exception, url:" + uri + ", exception:" + e.toString()
// + ", cost time(ms):" + (System.currentTimeMillis() - startTime));
new Exception("execute post request exception, url:"
+ uri + ", exception:" + e.toString() +
", cost time(ms):" + (System.currentTimeMillis() - startTime))
.printStackTrace();
new Exception("execute post request exception, url:" + uri + ", exception:" + e.toString() + ", cost time(ms):" + (System.currentTimeMillis() - startTime)).printStackTrace();
} finally {
if (httpEntity != null) {
try {
EntityUtils.consumeQuietly(httpEntity);
} catch (Exception e) {
// e.printStackTrace();
// logger.error("close response exception, url:" + uri + ", exception:" + e.toString()
// + ", cost time(ms):" + (System.currentTimeMillis() - startTime));
new Exception(
"close response exception, url:" + uri +
", exception:" + e.toString()
+ ", cost time(ms):" + (System.currentTimeMillis() - startTime))
.printStackTrace();
new Exception("close response exception, url:" + uri + ", exception:" + e.toString() + ", cost time(ms):" + (System.currentTimeMillis() - startTime)).printStackTrace();
}
}
}
......
......@@ -15,14 +15,12 @@
package com.taosdata.jdbc.utils;
import com.taosdata.jdbc.TSDBConnection;
import com.taosdata.jdbc.TSDBJNIConnector;
import java.sql.Connection;
import java.sql.SQLException;
public class SqlSyntaxValidator {
private static final String[] updateSQL = {"insert", "update", "delete", "create", "alter", "drop", "show", "describe", "use"};
private static final String[] updateSQL = {"insert", "update", "delete", "create", "alter", "drop", "show", "describe", "use", "import"};
private static final String[] querySQL = {"select"};
private TSDBConnection tsdbConnection;
......@@ -31,22 +29,6 @@ public class SqlSyntaxValidator {
this.tsdbConnection = (TSDBConnection) connection;
}
public boolean validateSqlSyntax(String sql) throws SQLException {
boolean res = false;
if (tsdbConnection == null || tsdbConnection.isClosed()) {
throw new SQLException("invalid connection");
} else {
TSDBJNIConnector jniConnector = tsdbConnection.getConnection();
if (jniConnector == null) {
throw new SQLException("jniConnector is null");
} else {
res = jniConnector.validateCreateTableSql(sql);
}
}
return res;
}
public static boolean isValidForExecuteUpdate(String sql) {
for (String prefix : updateSQL) {
if (sql.trim().toLowerCase().startsWith(prefix))
......@@ -56,18 +38,28 @@ public class SqlSyntaxValidator {
}
public static boolean isUseSql(String sql) {
return sql.trim().toLowerCase().startsWith(updateSQL[8]) || sql.trim().toLowerCase().matches("create\\s*database.*") || sql.toLowerCase().toLowerCase().matches("drop\\s*database.*");
return sql.trim().toLowerCase().startsWith("use") || sql.trim().toLowerCase().matches("create\\s*database.*") || sql.toLowerCase().toLowerCase().matches("drop\\s*database.*");
}
public static boolean isUpdateSql(String sql) {
return sql.trim().toLowerCase().startsWith(updateSQL[1]);
public static boolean isShowSql(String sql) {
return sql.trim().toLowerCase().startsWith("show");
}
public static boolean isDescribeSql(String sql) {
return sql.trim().toLowerCase().startsWith("describe");
}
public static boolean isInsertSql(String sql) {
return sql.trim().toLowerCase().startsWith(updateSQL[0]);
return sql.trim().toLowerCase().startsWith("insert") || sql.trim().toLowerCase().startsWith("import");
}
public static boolean isSelectSql(String sql) {
return sql.trim().toLowerCase().startsWith(querySQL[0]);
return sql.trim().toLowerCase().startsWith("select");
}
public static boolean isShowDatabaseSql(String sql) {
return sql.trim().toLowerCase().matches("show\\s*databases");
}
}
......@@ -7,7 +7,7 @@ import org.junit.Test;
import java.sql.*;
import java.util.Properties;
public class DatabaseMetaDataTest extends BaseTest {
public class DatabaseMetaDataTest {
static Connection connection = null;
static PreparedStatement statement = null;
static String dbName = "test";
......@@ -23,20 +23,21 @@ public class DatabaseMetaDataTest extends BaseTest {
}
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_HOST, host);
properties.setProperty(TSDBDriver.PROPERTY_KEY_USER, "root");
properties.setProperty(TSDBDriver.PROPERTY_KEY_PASSWORD, "taosdata");
properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
connection = DriverManager.getConnection("jdbc:TAOS://" + host + ":0/", properties);
String sql = "drop database if exists " + dbName;
statement = (TSDBPreparedStatement) connection.prepareStatement(sql);
statement = connection.prepareStatement(sql);
statement.executeUpdate("create database if not exists " + dbName);
statement.executeUpdate("create table if not exists " + dbName + "." + tName + " (ts timestamp, k int, v int)");
}
@Test
public void testMetaDataTest() throws SQLException {
DatabaseMetaData databaseMetaData = connection.getMetaData();
ResultSet resultSet = databaseMetaData.getTables(dbName, "t*", "t*", new String[]{"t"});
while (resultSet.next()) {
......@@ -180,7 +181,7 @@ public class DatabaseMetaDataTest extends BaseTest {
databaseMetaData.getCatalogs();
// databaseMetaData.getTableTypes();
databaseMetaData.getColumns("", "", "", "");
databaseMetaData.getColumns(dbName, "", tName, "");
databaseMetaData.getColumnPrivileges("", "", "", "");
databaseMetaData.getTablePrivileges("", "", "");
databaseMetaData.getBestRowIdentifier("", "", "", 0, false);
......
package com.taosdata.jdbc.rs;
import org.junit.*;
import org.junit.runners.MethodSorters;
......
......@@ -5,10 +5,6 @@ import org.junit.Test;
public class SqlSyntaxValidatorTest {
@Test
public void validateSqlSyntax() {
}
@Test
public void isSelectSQL() {
Assert.assertTrue(SqlSyntaxValidator.isSelectSql("select * from test.weather"));
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册