提交 e82a6c84 编写于 作者: H Hongze Cheng

Merge branch 'develop' into feature/TD-1925_new

...@@ -4,7 +4,7 @@ PROJECT(TDengine) ...@@ -4,7 +4,7 @@ PROJECT(TDengine)
IF (DEFINED VERNUMBER) IF (DEFINED VERNUMBER)
SET(TD_VER_NUMBER ${VERNUMBER}) SET(TD_VER_NUMBER ${VERNUMBER})
ELSE () ELSE ()
SET(TD_VER_NUMBER "2.0.12.0") SET(TD_VER_NUMBER "2.0.13.0")
ENDIF () ENDIF ()
IF (DEFINED VERCOMPATIBLE) IF (DEFINED VERCOMPATIBLE)
......
...@@ -4,6 +4,8 @@ ...@@ -4,6 +4,8 @@
TDengine采用关系型数据模型,需要建库、建表。因此对于一个具体的应用场景,需要考虑库的设计,超级表和普通表的设计。本节不讨论细致的语法规则,只介绍概念。 TDengine采用关系型数据模型,需要建库、建表。因此对于一个具体的应用场景,需要考虑库的设计,超级表和普通表的设计。本节不讨论细致的语法规则,只介绍概念。
关于数据建模请参考<a href="https://www.taosdata.com/blog/2020/11/11/1945.html">视频教程</a>
## 创建库 ## 创建库
不同类型的数据采集点往往具有不同的数据特征,包括数据采集频率的高低,数据保留时间的长短,副本的数目,数据块的大小,是否允许更新数据等等。为让各种场景下TDengine都能最大效率的工作,TDengine建议将不同数据特征的表创建在不同的库里,因为每个库可以配置不同的存储策略。创建一个库时,除SQL标准的选项外,应用还可以指定保留时长、副本数、内存块个数、时间精度、文件块里最大最小记录条数、是否压缩、一个数据文件覆盖的天数等多种参数。比如: 不同类型的数据采集点往往具有不同的数据特征,包括数据采集频率的高低,数据保留时间的长短,副本的数目,数据块的大小,是否允许更新数据等等。为让各种场景下TDengine都能最大效率的工作,TDengine建议将不同数据特征的表创建在不同的库里,因为每个库可以配置不同的存储策略。创建一个库时,除SQL标准的选项外,应用还可以指定保留时长、副本数、内存块个数、时间精度、文件块里最大最小记录条数、是否压缩、一个数据文件覆盖的天数等多种参数。比如:
...@@ -60,4 +62,3 @@ TDengine支持多列模型,只要物理量是一个数据采集点同时采集 ...@@ -60,4 +62,3 @@ TDengine支持多列模型,只要物理量是一个数据采集点同时采集
TDengine建议尽可能采用多列模型,因为插入效率以及存储效率更高。但对于有些场景,一个采集点的采集量的种类经常变化,这个时候,如果采用多列模型,就需要频繁修改超级表的结构定义,让应用变的复杂,这个时候,采用单列模型会显得简单。 TDengine建议尽可能采用多列模型,因为插入效率以及存储效率更高。但对于有些场景,一个采集点的采集量的种类经常变化,这个时候,如果采用多列模型,就需要频繁修改超级表的结构定义,让应用变的复杂,这个时候,采用单列模型会显得简单。
关于数据建模请参考<a href="https://www.taosdata.com/blog/2020/11/11/1945.html">视频教程</a>
...@@ -6,6 +6,8 @@ ...@@ -6,6 +6,8 @@
TDengine的集群管理极其简单,除添加和删除节点需要人工干预之外,其他全部是自动完成,最大程度的降低了运维的工作量。本章对集群管理的操作做详细的描述。 TDengine的集群管理极其简单,除添加和删除节点需要人工干预之外,其他全部是自动完成,最大程度的降低了运维的工作量。本章对集群管理的操作做详细的描述。
关于集群搭建请参考<a href="https://www.taosdata.com/blog/2020/11/11/1961.html">视频教程</a>
## 准备工作 ## 准备工作
**第零步**:规划集群所有物理节点的FQDN,将规划好的FQDN分别添加到每个物理节点的/etc/hostname;修改每个物理节点的/etc/hosts,将所有集群物理节点的IP与FQDN的对应添加好。【如部署了DNS,请联系网络管理员在DNS上做好相关配置】 **第零步**:规划集群所有物理节点的FQDN,将规划好的FQDN分别添加到每个物理节点的/etc/hostname;修改每个物理节点的/etc/hosts,将所有集群物理节点的IP与FQDN的对应添加好。【如部署了DNS,请联系网络管理员在DNS上做好相关配置】
...@@ -227,4 +229,3 @@ SHOW MNODES; ...@@ -227,4 +229,3 @@ SHOW MNODES;
TDengine提供一个执行程序tarbitrator, 找任何一台Linux服务器运行它即可。请点击[安装包下载](https://www.taosdata.com/cn/all-downloads/),在TDengine Arbitrator Linux一节中,选择适合的版本下载并安装。该程序对系统资源几乎没有要求,只需要保证有网络连接即可。该应用的命令行参数`-p`可以指定其对外服务的端口号,缺省是6042。配置每个taosd实例时,可以在配置文件taos.cfg里将参数arbitrator设置为arbitrator的End Point。如果该参数配置了,当副本数为偶数数,系统将自动连接配置的arbitrator。如果副本数为奇数,即使配置了arbitrator, 系统也不会去建立连接。 TDengine提供一个执行程序tarbitrator, 找任何一台Linux服务器运行它即可。请点击[安装包下载](https://www.taosdata.com/cn/all-downloads/),在TDengine Arbitrator Linux一节中,选择适合的版本下载并安装。该程序对系统资源几乎没有要求,只需要保证有网络连接即可。该应用的命令行参数`-p`可以指定其对外服务的端口号,缺省是6042。配置每个taosd实例时,可以在配置文件taos.cfg里将参数arbitrator设置为arbitrator的End Point。如果该参数配置了,当副本数为偶数数,系统将自动连接配置的arbitrator。如果副本数为奇数,即使配置了arbitrator, 系统也不会去建立连接。
关于集群搭建请参考<a href="https://www.taosdata.com/blog/2020/11/11/1961.html">视频教程</a>
name: tdengine name: tdengine
base: core18 base: core18
version: '2.0.12.0' version: '2.0.13.0'
icon: snap/gui/t-dengine.svg icon: snap/gui/t-dengine.svg
summary: an open-source big data platform designed and optimized for IoT. summary: an open-source big data platform designed and optimized for IoT.
description: | description: |
...@@ -72,7 +72,7 @@ parts: ...@@ -72,7 +72,7 @@ parts:
- usr/bin/taosd - usr/bin/taosd
- usr/bin/taos - usr/bin/taos
- usr/bin/taosdemo - usr/bin/taosdemo
- usr/lib/libtaos.so.2.0.12.0 - usr/lib/libtaos.so.2.0.13.0
- usr/lib/libtaos.so.1 - usr/lib/libtaos.so.1
- usr/lib/libtaos.so - usr/lib/libtaos.so
......
...@@ -75,11 +75,11 @@ static int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SC ...@@ -75,11 +75,11 @@ static int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SC
static int32_t convertFunctionId(int32_t optr, int16_t* functionId); static int32_t convertFunctionId(int32_t optr, int16_t* functionId);
static uint8_t convertOptr(SStrToken *pToken); static uint8_t convertOptr(SStrToken *pToken);
static int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSelection, bool isSTable, bool joinQuery); static int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSelection, bool isSTable, bool joinQuery, bool intervalQuery);
static bool validateIpAddress(const char* ip, size_t size); static bool validateIpAddress(const char* ip, size_t size);
static bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo); static bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo);
static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery); static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery, bool intervalQuery);
static int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, SArray* pList, SSqlCmd* pCmd); static int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, SArray* pList, SSqlCmd* pCmd);
...@@ -1475,7 +1475,7 @@ static void addPrimaryTsColIntoResult(SQueryInfo* pQueryInfo) { ...@@ -1475,7 +1475,7 @@ static void addPrimaryTsColIntoResult(SQueryInfo* pQueryInfo) {
pQueryInfo->type |= TSDB_QUERY_TYPE_PROJECTION_QUERY; pQueryInfo->type |= TSDB_QUERY_TYPE_PROJECTION_QUERY;
} }
int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSelection, bool isSTable, bool joinQuery) { int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSelection, bool isSTable, bool joinQuery, bool intervalQuery) {
assert(pSelection != NULL && pCmd != NULL); assert(pSelection != NULL && pCmd != NULL);
const char* msg2 = "functions can not be mixed up"; const char* msg2 = "functions can not be mixed up";
...@@ -1531,7 +1531,7 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel ...@@ -1531,7 +1531,7 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel
addPrimaryTsColIntoResult(pQueryInfo); addPrimaryTsColIntoResult(pQueryInfo);
} }
if (!functionCompatibleCheck(pQueryInfo, joinQuery)) { if (!functionCompatibleCheck(pQueryInfo, joinQuery, intervalQuery)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
} }
...@@ -2810,7 +2810,7 @@ bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) ...@@ -2810,7 +2810,7 @@ bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo)
return false; return false;
} }
static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery) { static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery, bool intervalQuery) {
int32_t startIdx = 0; int32_t startIdx = 0;
size_t numOfExpr = tscSqlExprNumOfExprs(pQueryInfo); size_t numOfExpr = tscSqlExprNumOfExprs(pQueryInfo);
...@@ -2826,6 +2826,10 @@ static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery) { ...@@ -2826,6 +2826,10 @@ static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery) {
int32_t factor = functionCompatList[tscSqlExprGet(pQueryInfo, startIdx)->functionId]; int32_t factor = functionCompatList[tscSqlExprGet(pQueryInfo, startIdx)->functionId];
if (tscSqlExprGet(pQueryInfo, 0)->functionId == TSDB_FUNC_LAST_ROW && (joinQuery || intervalQuery)) {
return false;
}
// diff function cannot be executed with other function // diff function cannot be executed with other function
// arithmetic function can be executed with other arithmetic functions // arithmetic function can be executed with other arithmetic functions
size_t size = tscSqlExprNumOfExprs(pQueryInfo); size_t size = tscSqlExprNumOfExprs(pQueryInfo);
...@@ -2850,7 +2854,7 @@ static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery) { ...@@ -2850,7 +2854,7 @@ static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery) {
} }
} }
if (functionId == TSDB_FUNC_LAST_ROW && joinQuery) { if (functionId == TSDB_FUNC_LAST_ROW && (joinQuery || intervalQuery)) {
return false; return false;
} }
} }
...@@ -6320,7 +6324,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) { ...@@ -6320,7 +6324,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
} }
bool isSTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo); bool isSTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
if (parseSelectClause(&pSql->cmd, 0, pQuerySql->pSelection, isSTable, false) != TSDB_CODE_SUCCESS) { if (parseSelectClause(&pSql->cmd, 0, pQuerySql->pSelection, isSTable, false, false) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
} }
...@@ -6565,7 +6569,9 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { ...@@ -6565,7 +6569,9 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
int32_t joinQuery = (pQuerySql->from != NULL && taosArrayGetSize(pQuerySql->from) > 2); int32_t joinQuery = (pQuerySql->from != NULL && taosArrayGetSize(pQuerySql->from) > 2);
if (parseSelectClause(pCmd, index, pQuerySql->pSelection, isSTable, joinQuery) != TSDB_CODE_SUCCESS) { int32_t intervalQuery = !(pQuerySql->interval.type == 0 || pQuerySql->interval.n == 0);
if (parseSelectClause(pCmd, index, pQuerySql->pSelection, isSTable, joinQuery, intervalQuery) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
} }
......
...@@ -458,12 +458,13 @@ void tscFreeRegisteredSqlObj(void *pSql) { ...@@ -458,12 +458,13 @@ void tscFreeRegisteredSqlObj(void *pSql) {
assert(RID_VALID(p->self)); assert(RID_VALID(p->self));
tscFreeSqlObj(p);
taosReleaseRef(tscRefId, pTscObj->rid);
int32_t num = atomic_sub_fetch_32(&pTscObj->numOfObj, 1); int32_t num = atomic_sub_fetch_32(&pTscObj->numOfObj, 1);
int32_t total = atomic_sub_fetch_32(&tscNumOfObj, 1); int32_t total = atomic_sub_fetch_32(&tscNumOfObj, 1);
tscDebug("%p free SqlObj, total in tscObj:%d, total:%d", pSql, num, total); tscDebug("%p free SqlObj, total in tscObj:%d, total:%d", pSql, num, total);
tscFreeSqlObj(p);
taosReleaseRef(tscRefId, pTscObj->rid);
} }
void tscFreeSqlObj(SSqlObj* pSql) { void tscFreeSqlObj(SSqlObj* pSql) {
......
...@@ -56,6 +56,12 @@ ...@@ -56,6 +56,12 @@
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<!-- for restful --> <!-- for restful -->
<dependency> <dependency>
<groupId>org.apache.httpcomponents</groupId> <groupId>org.apache.httpcomponents</groupId>
...@@ -73,7 +79,14 @@ ...@@ -73,7 +79,14 @@
<version>1.2.58</version> <version>1.2.58</version>
</dependency> </dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
</dependencies> </dependencies>
<build> <build>
<plugins> <plugins>
<plugin> <plugin>
......
...@@ -19,68 +19,71 @@ import java.util.Map; ...@@ -19,68 +19,71 @@ import java.util.Map;
public abstract class TSDBConstants { public abstract class TSDBConstants {
public static final String DEFAULT_PORT = "6200"; public static final String DEFAULT_PORT = "6200";
public static final String UNSUPPORT_METHOD_EXCEPTIONZ_MSG = "this operation is NOT supported currently!"; public static final String UNSUPPORT_METHOD_EXCEPTIONZ_MSG = "this operation is NOT supported currently!";
public static final String INVALID_VARIABLES = "invalid variables"; public static final String INVALID_VARIABLES = "invalid variables";
public static Map<Integer, String> DATATYPE_MAP = null; public static Map<Integer, String> DATATYPE_MAP = null;
public static final long JNI_NULL_POINTER = 0L; public static final long JNI_NULL_POINTER = 0L;
public static final int JNI_SUCCESS = 0; public static final int JNI_SUCCESS = 0;
public static final int JNI_TDENGINE_ERROR = -1; public static final int JNI_TDENGINE_ERROR = -1;
public static final int JNI_CONNECTION_NULL = -2; public static final int JNI_CONNECTION_NULL = -2;
public static final int JNI_RESULT_SET_NULL = -3; public static final int JNI_RESULT_SET_NULL = -3;
public static final int JNI_NUM_OF_FIELDS_0 = -4; public static final int JNI_NUM_OF_FIELDS_0 = -4;
public static final int JNI_SQL_NULL = -5; public static final int JNI_SQL_NULL = -5;
public static final int JNI_FETCH_END = -6; public static final int JNI_FETCH_END = -6;
public static final int TSDB_DATA_TYPE_NULL = 0;
public static final int TSDB_DATA_TYPE_BOOL = 1;
public static final int TSDB_DATA_TYPE_TINYINT = 2;
public static final int TSDB_DATA_TYPE_SMALLINT = 3;
public static final int TSDB_DATA_TYPE_INT = 4;
public static final int TSDB_DATA_TYPE_BIGINT = 5;
public static final int TSDB_DATA_TYPE_FLOAT = 6;
public static final int TSDB_DATA_TYPE_DOUBLE = 7;
public static final int TSDB_DATA_TYPE_BINARY = 8;
public static final int TSDB_DATA_TYPE_TIMESTAMP = 9;
public static final int TSDB_DATA_TYPE_NCHAR = 10;
public static String WrapErrMsg(String msg) {
return "TDengine Error: " + msg;
}
public static String FixErrMsg(int code) { public static final int TSDB_DATA_TYPE_NULL = 0;
switch (code) { public static final int TSDB_DATA_TYPE_BOOL = 1;
case JNI_TDENGINE_ERROR: public static final int TSDB_DATA_TYPE_TINYINT = 2;
return WrapErrMsg("internal error of database!"); public static final int TSDB_DATA_TYPE_SMALLINT = 3;
case JNI_CONNECTION_NULL: public static final int TSDB_DATA_TYPE_INT = 4;
return WrapErrMsg("invalid tdengine connection!"); public static final int TSDB_DATA_TYPE_BIGINT = 5;
case JNI_RESULT_SET_NULL: public static final int TSDB_DATA_TYPE_FLOAT = 6;
return WrapErrMsg("invalid resultset pointer!"); public static final int TSDB_DATA_TYPE_DOUBLE = 7;
case JNI_NUM_OF_FIELDS_0: public static final int TSDB_DATA_TYPE_BINARY = 8;
return WrapErrMsg("invalid num of fields!"); public static final int TSDB_DATA_TYPE_TIMESTAMP = 9;
case JNI_SQL_NULL: public static final int TSDB_DATA_TYPE_NCHAR = 10;
return WrapErrMsg("can't execute empty sql!");
case JNI_FETCH_END:
return WrapErrMsg("fetch to the end of resultset");
default:
break;
}
return WrapErrMsg("unkown error!");
}
static { // nchar field's max length
DATATYPE_MAP = new HashMap<Integer, String>(); public static final int maxFieldSize = 16 * 1024;
DATATYPE_MAP.put(1, "BOOL");
DATATYPE_MAP.put(2, "TINYINT"); public static String WrapErrMsg(String msg) {
DATATYPE_MAP.put(3, "SMALLINT"); return "TDengine Error: " + msg;
DATATYPE_MAP.put(4, "INT"); }
DATATYPE_MAP.put(5, "BIGINT");
DATATYPE_MAP.put(6, "FLOAT"); public static String FixErrMsg(int code) {
DATATYPE_MAP.put(7, "DOUBLE"); switch (code) {
DATATYPE_MAP.put(8, "BINARY"); case JNI_TDENGINE_ERROR:
DATATYPE_MAP.put(9, "TIMESTAMP"); return WrapErrMsg("internal error of database!");
DATATYPE_MAP.put(10, "NCHAR"); case JNI_CONNECTION_NULL:
} return WrapErrMsg("invalid tdengine connection!");
case JNI_RESULT_SET_NULL:
return WrapErrMsg("invalid resultset pointer!");
case JNI_NUM_OF_FIELDS_0:
return WrapErrMsg("invalid num of fields!");
case JNI_SQL_NULL:
return WrapErrMsg("can't execute empty sql!");
case JNI_FETCH_END:
return WrapErrMsg("fetch to the end of resultset");
default:
break;
}
return WrapErrMsg("unkown error!");
}
static {
DATATYPE_MAP = new HashMap<Integer, String>();
DATATYPE_MAP.put(1, "BOOL");
DATATYPE_MAP.put(2, "TINYINT");
DATATYPE_MAP.put(3, "SMALLINT");
DATATYPE_MAP.put(4, "INT");
DATATYPE_MAP.put(5, "BIGINT");
DATATYPE_MAP.put(6, "FLOAT");
DATATYPE_MAP.put(7, "DOUBLE");
DATATYPE_MAP.put(8, "BINARY");
DATATYPE_MAP.put(9, "TIMESTAMP");
DATATYPE_MAP.put(10, "NCHAR");
}
} }
package com.taosdata.jdbc.rs; package com.taosdata.jdbc.rs;
import com.taosdata.jdbc.TSDBConstants; import com.taosdata.jdbc.TSDBConstants;
import com.taosdata.jdbc.TSDBDriver;
import java.sql.*; import java.sql.*;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
public class RestfulConnection implements Connection { public class RestfulConnection implements Connection {
private static final String CONNECTION_IS_CLOSED = "connection is closed.";
private static final String AUTO_COMMIT_IS_TRUE = "auto commit is true";
private final String host; private final String host;
private final int port; private final int port;
private final Properties props; private final Properties props;
private final String database; private volatile String database;
private final String url; private final String url;
/******************************************************/
private boolean isClosed;
private DatabaseMetaData metadata;
private Map<String, Class<?>> typeMap;
private Properties clientInfoProps = new Properties();
public RestfulConnection(String host, String port, Properties props, String database, String url) { public RestfulConnection(String host, String port, Properties props, String database, String url) {
this.host = host; this.host = host;
...@@ -21,280 +31,424 @@ public class RestfulConnection implements Connection { ...@@ -21,280 +31,424 @@ public class RestfulConnection implements Connection {
this.props = props; this.props = props;
this.database = database; this.database = database;
this.url = url; this.url = url;
this.metadata = new RestfulDatabaseMetaData(url, props.getProperty(TSDBDriver.PROPERTY_KEY_USER), this);
} }
@Override @Override
public Statement createStatement() throws SQLException { public Statement createStatement() throws SQLException {
if (isClosed()) if (isClosed())
throw new SQLException(TSDBConstants.WrapErrMsg("restful TDengine connection is closed.")); throw new SQLException(CONNECTION_IS_CLOSED);
return new RestfulStatement(this, database); return new RestfulStatement(this, database);
} }
@Override @Override
public PreparedStatement prepareStatement(String sql) throws SQLException { public PreparedStatement prepareStatement(String sql) throws SQLException {
return null; if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
//TODO: prepareStatement
throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
} }
@Override @Override
public CallableStatement prepareCall(String sql) throws SQLException { public CallableStatement prepareCall(String sql) throws SQLException {
return null; if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
} }
@Override @Override
public String nativeSQL(String sql) throws SQLException { public String nativeSQL(String sql) throws SQLException {
return null; if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
//nothing did
return sql;
} }
@Override @Override
public void setAutoCommit(boolean autoCommit) throws SQLException { public void setAutoCommit(boolean autoCommit) throws SQLException {
if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
if (!autoCommit)
throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
} }
@Override @Override
public boolean getAutoCommit() throws SQLException { public boolean getAutoCommit() throws SQLException {
return false; if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
return true;
} }
@Override @Override
public void commit() throws SQLException { public void commit() throws SQLException {
if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
if (getAutoCommit())
throw new SQLException(AUTO_COMMIT_IS_TRUE);
//nothing to do
} }
@Override @Override
public void rollback() throws SQLException { public void rollback() throws SQLException {
if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
if (getAutoCommit())
throw new SQLException(AUTO_COMMIT_IS_TRUE);
//nothing to do
} }
@Override @Override
public void close() throws SQLException { public void close() throws SQLException {
if (isClosed)
return;
//TODO: release all resources
isClosed = true;
} }
@Override @Override
public boolean isClosed() throws SQLException { public boolean isClosed() throws SQLException {
return false; return isClosed;
} }
@Override @Override
public DatabaseMetaData getMetaData() throws SQLException { public DatabaseMetaData getMetaData() throws SQLException {
//TODO: RestfulDatabaseMetaData is not implemented if (isClosed())
return new RestfulDatabaseMetaData(); throw new SQLException(CONNECTION_IS_CLOSED);
return this.metadata;
} }
@Override @Override
public void setReadOnly(boolean readOnly) throws SQLException { public void setReadOnly(boolean readOnly) throws SQLException {
if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
// nothing to do
} }
@Override @Override
public boolean isReadOnly() throws SQLException { public boolean isReadOnly() throws SQLException {
return false; if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
return true;
} }
@Override @Override
public void setCatalog(String catalog) throws SQLException { public void setCatalog(String catalog) throws SQLException {
if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
synchronized (RestfulConnection.class) {
this.database = catalog;
}
} }
@Override @Override
public String getCatalog() throws SQLException { public String getCatalog() throws SQLException {
return null; if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
return this.database;
} }
@Override @Override
public void setTransactionIsolation(int level) throws SQLException { public void setTransactionIsolation(int level) throws SQLException {
//transaction is not supported if (isClosed())
throw new SQLFeatureNotSupportedException("transactions are not supported"); throw new SQLException(CONNECTION_IS_CLOSED);
switch (level) {
case Connection.TRANSACTION_NONE:
break;
case Connection.TRANSACTION_READ_UNCOMMITTED:
case Connection.TRANSACTION_READ_COMMITTED:
case Connection.TRANSACTION_REPEATABLE_READ:
case Connection.TRANSACTION_SERIALIZABLE:
throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
default:
throw new SQLException(TSDBConstants.INVALID_VARIABLES);
}
} }
/**
*
*/
@Override @Override
public int getTransactionIsolation() throws SQLException { public int getTransactionIsolation() throws SQLException {
if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
//Connection.TRANSACTION_NONE specifies that transactions are not supported. //Connection.TRANSACTION_NONE specifies that transactions are not supported.
return Connection.TRANSACTION_NONE; return Connection.TRANSACTION_NONE;
} }
@Override @Override
public SQLWarning getWarnings() throws SQLException { public SQLWarning getWarnings() throws SQLException {
//TODO: getWarnings not implemented if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
return null; return null;
} }
@Override @Override
public void clearWarnings() throws SQLException { public void clearWarnings() throws SQLException {
throw new SQLFeatureNotSupportedException("clearWarnings not supported."); if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
//nothing to do
} }
@Override @Override
public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException { public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
return null; if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
if (resultSetType != ResultSet.TYPE_FORWARD_ONLY) {
throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
}
if (resultSetConcurrency != ResultSet.CONCUR_READ_ONLY)
throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
return createStatement();
} }
@Override @Override
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
return null; if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
if (resultSetType != ResultSet.TYPE_FORWARD_ONLY || resultSetConcurrency != ResultSet.CONCUR_READ_ONLY)
throw new SQLFeatureNotSupportedException(TSDBConstants.INVALID_VARIABLES);
return this.prepareStatement(sql);
} }
@Override @Override
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
return null; if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
if (resultSetType != ResultSet.TYPE_FORWARD_ONLY || resultSetConcurrency != ResultSet.CONCUR_READ_ONLY)
throw new SQLFeatureNotSupportedException(TSDBConstants.INVALID_VARIABLES);
throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
} }
@Override @Override
public Map<String, Class<?>> getTypeMap() throws SQLException { public Map<String, Class<?>> getTypeMap() throws SQLException {
return null; if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
synchronized (RestfulConnection.class) {
if (this.typeMap == null) {
this.typeMap = new HashMap<>();
}
return this.typeMap;
}
} }
@Override @Override
public void setTypeMap(Map<String, Class<?>> map) throws SQLException { public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
synchronized (RestfulConnection.class) {
this.typeMap = map;
}
} }
@Override @Override
public void setHoldability(int holdability) throws SQLException { public void setHoldability(int holdability) throws SQLException {
if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
if (holdability != ResultSet.HOLD_CURSORS_OVER_COMMIT)
throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
} }
@Override @Override
public int getHoldability() throws SQLException { public int getHoldability() throws SQLException {
return 0; if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
return ResultSet.HOLD_CURSORS_OVER_COMMIT;
} }
@Override @Override
public Savepoint setSavepoint() throws SQLException { public Savepoint setSavepoint() throws SQLException {
return null; if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
if (getAutoCommit())
throw new SQLException(TSDBConstants.INVALID_VARIABLES);
//nothing to do
throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
} }
@Override @Override
public Savepoint setSavepoint(String name) throws SQLException { public Savepoint setSavepoint(String name) throws SQLException {
return null; if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
if (getAutoCommit())
throw new SQLException(TSDBConstants.INVALID_VARIABLES);
//nothing to do
throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
} }
@Override @Override
public void rollback(Savepoint savepoint) throws SQLException { public void rollback(Savepoint savepoint) throws SQLException {
if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
if (getAutoCommit())
throw new SQLException(TSDBConstants.INVALID_VARIABLES);
//nothing to do
throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
} }
@Override @Override
public void releaseSavepoint(Savepoint savepoint) throws SQLException { public void releaseSavepoint(Savepoint savepoint) throws SQLException {
if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
} }
@Override @Override
public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
return null; if (resultSetHoldability != ResultSet.HOLD_CURSORS_OVER_COMMIT)
throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
return createStatement(resultSetType, resultSetConcurrency);
} }
@Override @Override
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
return null; if (resultSetHoldability != ResultSet.HOLD_CURSORS_OVER_COMMIT)
throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
return prepareStatement(sql, resultSetType, resultSetConcurrency);
} }
@Override @Override
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
return null; throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
} }
@Override @Override
public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException { public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
return null; throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
} }
@Override @Override
public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException { public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
return null; throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
} }
@Override @Override
public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException { public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
return null; throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
} }
@Override @Override
public Clob createClob() throws SQLException { public Clob createClob() throws SQLException {
//TODO: not supported throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
throw new SQLFeatureNotSupportedException();
} }
@Override @Override
public Blob createBlob() throws SQLException { public Blob createBlob() throws SQLException {
//TODO: not supported throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
throw new SQLFeatureNotSupportedException();
} }
@Override @Override
public NClob createNClob() throws SQLException { public NClob createNClob() throws SQLException {
//TODO: not supported throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
throw new SQLFeatureNotSupportedException();
} }
@Override @Override
public SQLXML createSQLXML() throws SQLException { public SQLXML createSQLXML() throws SQLException {
//TODO: not supported throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
throw new SQLFeatureNotSupportedException();
} }
@Override @Override
public boolean isValid(int timeout) throws SQLException { public boolean isValid(int timeout) throws SQLException {
return false; if (timeout < 0)
throw new SQLException(TSDBConstants.INVALID_VARIABLES);
// TODO:
/* The driver shall submit a query on the connection or use some other mechanism that positively verifies
the connection is still valid when this method is called.*/
return !isClosed();
} }
@Override @Override
public void setClientInfo(String name, String value) throws SQLClientInfoException { public void setClientInfo(String name, String value) throws SQLClientInfoException {
if (isClosed)
throw new SQLClientInfoException();
clientInfoProps.setProperty(name, value);
} }
@Override @Override
public void setClientInfo(Properties properties) throws SQLClientInfoException { public void setClientInfo(Properties properties) throws SQLClientInfoException {
if (isClosed)
throw new SQLClientInfoException();
for (Enumeration<Object> enumer = properties.keys(); enumer.hasMoreElements(); ) {
String name = (String) enumer.nextElement();
clientInfoProps.put(name, properties.getProperty(name));
}
} }
@Override @Override
public String getClientInfo(String name) throws SQLException { public String getClientInfo(String name) throws SQLException {
return null; if (isClosed)
throw new SQLClientInfoException();
return clientInfoProps.getProperty(name);
} }
@Override @Override
public Properties getClientInfo() throws SQLException { public Properties getClientInfo() throws SQLException {
return null; if (isClosed)
throw new SQLClientInfoException();
return clientInfoProps;
} }
@Override @Override
public Array createArrayOf(String typeName, Object[] elements) throws SQLException { public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
//TODO: not supported throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
throw new SQLFeatureNotSupportedException();
} }
@Override @Override
public Struct createStruct(String typeName, Object[] attributes) throws SQLException { public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
//TODO: not supported throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
throw new SQLFeatureNotSupportedException();
} }
@Override @Override
public void setSchema(String schema) throws SQLException { public void setSchema(String schema) throws SQLException {
if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
synchronized (RestfulConnection.class) {
this.database = schema;
}
} }
@Override @Override
public String getSchema() throws SQLException { public String getSchema() throws SQLException {
return null; if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
return this.database;
} }
@Override @Override
public void abort(Executor executor) throws SQLException { public void abort(Executor executor) throws SQLException {
if (executor == null) {
throw new SQLException("Executor can not be null");
}
executor.execute(() -> {
try {
close();
} catch (SQLException e) {
e.printStackTrace();
}
});
} }
@Override @Override
public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException { public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
} }
@Override @Override
public int getNetworkTimeout() throws SQLException { public int getNetworkTimeout() throws SQLException {
if (isClosed())
throw new SQLException(CONNECTION_IS_CLOSED);
return 0; return 0;
} }
......
...@@ -33,7 +33,7 @@ public class RestfulDriver extends AbstractTaosDriver { ...@@ -33,7 +33,7 @@ public class RestfulDriver extends AbstractTaosDriver {
return null; return null;
Properties props = parseURL(url, info); Properties props = parseURL(url, info);
String host = props.getProperty(TSDBDriver.PROPERTY_KEY_HOST, "localhost"); String host = props.getProperty(TSDBDriver.PROPERTY_KEY_HOST);
String port = props.getProperty(TSDBDriver.PROPERTY_KEY_PORT, "6041"); String port = props.getProperty(TSDBDriver.PROPERTY_KEY_PORT, "6041");
String database = props.containsKey(TSDBDriver.PROPERTY_KEY_DBNAME) ? props.getProperty(TSDBDriver.PROPERTY_KEY_DBNAME) : null; String database = props.containsKey(TSDBDriver.PROPERTY_KEY_DBNAME) ? props.getProperty(TSDBDriver.PROPERTY_KEY_DBNAME) : null;
......
...@@ -2,13 +2,15 @@ package com.taosdata.jdbc.rs; ...@@ -2,13 +2,15 @@ package com.taosdata.jdbc.rs;
import java.sql.ResultSetMetaData; import java.sql.ResultSetMetaData;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.List; import java.util.ArrayList;
public class RestfulResultSetMetaData implements ResultSetMetaData { public class RestfulResultSetMetaData implements ResultSetMetaData {
private List<String> fields; private final String database;
private ArrayList<RestfulResultSet.Field> fields;
public RestfulResultSetMetaData(List<String> fields) { public RestfulResultSetMetaData(String database, ArrayList<RestfulResultSet.Field> fields) {
this.database = database;
this.fields = fields; this.fields = fields;
} }
...@@ -24,6 +26,7 @@ public class RestfulResultSetMetaData implements ResultSetMetaData { ...@@ -24,6 +26,7 @@ public class RestfulResultSetMetaData implements ResultSetMetaData {
@Override @Override
public boolean isCaseSensitive(int column) throws SQLException { public boolean isCaseSensitive(int column) throws SQLException {
//TODO
return false; return false;
} }
...@@ -39,7 +42,7 @@ public class RestfulResultSetMetaData implements ResultSetMetaData { ...@@ -39,7 +42,7 @@ public class RestfulResultSetMetaData implements ResultSetMetaData {
@Override @Override
public int isNullable(int column) throws SQLException { public int isNullable(int column) throws SQLException {
return 0; return ResultSetMetaData.columnNullable;
} }
@Override @Override
...@@ -54,7 +57,7 @@ public class RestfulResultSetMetaData implements ResultSetMetaData { ...@@ -54,7 +57,7 @@ public class RestfulResultSetMetaData implements ResultSetMetaData {
@Override @Override
public String getColumnLabel(int column) throws SQLException { public String getColumnLabel(int column) throws SQLException {
return fields.get(column - 1); return fields.get(column - 1).name;
} }
@Override @Override
...@@ -64,7 +67,7 @@ public class RestfulResultSetMetaData implements ResultSetMetaData { ...@@ -64,7 +67,7 @@ public class RestfulResultSetMetaData implements ResultSetMetaData {
@Override @Override
public String getSchemaName(int column) throws SQLException { public String getSchemaName(int column) throws SQLException {
return null; return this.database;
} }
@Override @Override
...@@ -84,7 +87,7 @@ public class RestfulResultSetMetaData implements ResultSetMetaData { ...@@ -84,7 +87,7 @@ public class RestfulResultSetMetaData implements ResultSetMetaData {
@Override @Override
public String getCatalogName(int column) throws SQLException { public String getCatalogName(int column) throws SQLException {
return null; return this.database;
} }
@Override @Override
......
...@@ -7,20 +7,60 @@ import com.taosdata.jdbc.rs.util.HttpClientPoolUtil; ...@@ -7,20 +7,60 @@ import com.taosdata.jdbc.rs.util.HttpClientPoolUtil;
import com.taosdata.jdbc.utils.SqlSyntaxValidator; import com.taosdata.jdbc.utils.SqlSyntaxValidator;
import java.sql.*; import java.sql.*;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.stream.Collectors;
public class RestfulStatement implements Statement { public class RestfulStatement implements Statement {
private static final String STATEMENT_CLOSED = "Statement already closed.";
private boolean closed; private boolean closed;
private String database; private String database;
private final RestfulConnection conn; private final RestfulConnection conn;
public RestfulStatement(RestfulConnection c, String database) { private volatile RestfulResultSet resultSet;
this.conn = c; private volatile int affectedRows;
private volatile boolean closeOnCompletion;
public RestfulStatement(RestfulConnection conn, String database) {
this.conn = conn;
this.database = database; this.database = database;
} }
private String[] parseTableIdentifier(String sql) {
sql = sql.trim().toLowerCase();
String[] ret = null;
if (sql.contains("where"))
sql = sql.substring(0, sql.indexOf("where"));
if (sql.contains("interval"))
sql = sql.substring(0, sql.indexOf("interval"));
if (sql.contains("fill"))
sql = sql.substring(0, sql.indexOf("fill"));
if (sql.contains("sliding"))
sql = sql.substring(0, sql.indexOf("sliding"));
if (sql.contains("group by"))
sql = sql.substring(0, sql.indexOf("group by"));
if (sql.contains("order by"))
sql = sql.substring(0, sql.indexOf("order by"));
if (sql.contains("slimit"))
sql = sql.substring(0, sql.indexOf("slimit"));
if (sql.contains("limit"))
sql = sql.substring(0, sql.indexOf("limit"));
// parse
if (sql.contains("from")) {
sql = sql.substring(sql.indexOf("from") + 4).trim();
return Arrays.asList(sql.split(",")).stream()
.map(tableIdentifier -> {
tableIdentifier = tableIdentifier.trim();
if (tableIdentifier.contains(" "))
tableIdentifier = tableIdentifier.substring(0, tableIdentifier.indexOf(" "));
return tableIdentifier;
}).collect(Collectors.joining(",")).split(",");
}
return ret;
}
@Override @Override
public ResultSet executeQuery(String sql) throws SQLException { public ResultSet executeQuery(String sql) throws SQLException {
if (isClosed()) if (isClosed())
...@@ -29,43 +69,33 @@ public class RestfulStatement implements Statement { ...@@ -29,43 +69,33 @@ public class RestfulStatement implements Statement {
throw new SQLException("not a select sql for executeQuery: " + sql); throw new SQLException("not a select sql for executeQuery: " + sql);
final String url = "http://" + conn.getHost() + ":" + conn.getPort() + "/rest/sql"; final String url = "http://" + conn.getHost() + ":" + conn.getPort() + "/rest/sql";
// row data
String result = HttpClientPoolUtil.execute(url, sql); String result = HttpClientPoolUtil.execute(url, sql);
String fields = ""; JSONObject resultJson = JSON.parseObject(result);
List<String> words = Arrays.asList(sql.split(" ")); if (resultJson.getString("status").equals("error")) {
if (words.get(0).equalsIgnoreCase("select")) { throw new SQLException(TSDBConstants.WrapErrMsg("SQL execution error: " + resultJson.getString("desc") + "\n" + "error code: " + resultJson.getString("code")));
int index = 0;
if (words.contains("from")) {
index = words.indexOf("from");
}
if (words.contains("FROM")) {
index = words.indexOf("FROM");
}
fields = HttpClientPoolUtil.execute(url, "DESCRIBE " + words.get(index + 1));
}
JSONObject jsonObject = JSON.parseObject(result);
if (jsonObject.getString("status").equals("error")) {
throw new SQLException(TSDBConstants.WrapErrMsg("SQL execution error: " +
jsonObject.getString("desc") + "\n" +
"error code: " + jsonObject.getString("code")));
}
String dataStr = jsonObject.getString("data");
if ("use".equalsIgnoreCase(fields.split(" ")[0])) {
return new RestfulResultSet(dataStr, "");
} }
JSONObject jsonField = JSON.parseObject(fields); // parse table name from sql
if (jsonField == null) { String[] tableIdentifiers = parseTableIdentifier(sql);
return new RestfulResultSet(dataStr, ""); if (tableIdentifiers != null) {
} List<JSONObject> fieldJsonList = new ArrayList<>();
if (jsonField.getString("status").equals("error")) { for (String tableIdentifier : tableIdentifiers) {
throw new SQLException(TSDBConstants.WrapErrMsg("SQL execution error: " + // field meta
jsonField.getString("desc") + "\n" + String fields = HttpClientPoolUtil.execute(url, "DESCRIBE " + tableIdentifier);
"error code: " + jsonField.getString("code"))); JSONObject fieldJson = JSON.parseObject(fields);
if (fieldJson.getString("status").equals("error")) {
throw new SQLException(TSDBConstants.WrapErrMsg("SQL execution error: " + fieldJson.getString("desc") + "\n" + "error code: " + fieldJson.getString("code")));
}
fieldJsonList.add(fieldJson);
}
this.resultSet = new RestfulResultSet(database, this, resultJson, fieldJsonList);
} else {
this.resultSet = new RestfulResultSet(database, this, resultJson);
} }
String fieldData = jsonField.getString("data");
return new RestfulResultSet(dataStr, fieldData); this.affectedRows = 0;
return resultSet;
} }
@Override @Override
...@@ -78,77 +108,103 @@ public class RestfulStatement implements Statement { ...@@ -78,77 +108,103 @@ public class RestfulStatement implements Statement {
if (this.database == null) if (this.database == null)
throw new SQLException("Database not specified or available"); throw new SQLException("Database not specified or available");
final String url = "http://" + conn.getHost() + ":" + conn.getPort() + "/rest/sql"; final String url = "http://" + conn.getHost().trim() + ":" + conn.getPort() + "/rest/sql";
HttpClientPoolUtil.execute(url, "use " + conn.getDatabase()); // HttpClientPoolUtil.execute(url, "use " + conn.getDatabase());
String result = HttpClientPoolUtil.execute(url, sql); String result = HttpClientPoolUtil.execute(url, sql);
JSONObject jsonObject = JSON.parseObject(result); JSONObject jsonObject = JSON.parseObject(result);
if (jsonObject.getString("status").equals("error")) { if (jsonObject.getString("status").equals("error")) {
throw new SQLException(TSDBConstants.WrapErrMsg("SQL execution error: " + throw new SQLException(TSDBConstants.WrapErrMsg("SQL execution error: " + jsonObject.getString("desc") + "\n" + "error code: " + jsonObject.getString("code")));
jsonObject.getString("desc") + "\n" +
"error code: " + jsonObject.getString("code")));
} }
return Integer.parseInt(jsonObject.getString("rows")); this.resultSet = null;
this.affectedRows = Integer.parseInt(jsonObject.getString("rows"));
return this.affectedRows;
} }
@Override @Override
public void close() throws SQLException { public void close() throws SQLException {
this.closed = true; synchronized (RestfulStatement.class) {
if (!isClosed())
this.closed = true;
}
} }
@Override @Override
public int getMaxFieldSize() throws SQLException { public int getMaxFieldSize() throws SQLException {
return 0; if (isClosed())
throw new SQLException(STATEMENT_CLOSED);
return TSDBConstants.maxFieldSize;
} }
@Override @Override
public void setMaxFieldSize(int max) throws SQLException { public void setMaxFieldSize(int max) throws SQLException {
if (isClosed())
throw new SQLException(STATEMENT_CLOSED);
if (max < 0)
throw new SQLException(TSDBConstants.INVALID_VARIABLES);
// nothing to do
} }
@Override @Override
public int getMaxRows() throws SQLException { public int getMaxRows() throws SQLException {
if (isClosed())
throw new SQLException(STATEMENT_CLOSED);
return 0; return 0;
} }
@Override @Override
public void setMaxRows(int max) throws SQLException { public void setMaxRows(int max) throws SQLException {
if (isClosed())
throw new SQLException(STATEMENT_CLOSED);
if (max < 0)
throw new SQLException(TSDBConstants.INVALID_VARIABLES);
// nothing to do
} }
@Override @Override
public void setEscapeProcessing(boolean enable) throws SQLException { public void setEscapeProcessing(boolean enable) throws SQLException {
if (isClosed())
throw new SQLException(RestfulStatement.STATEMENT_CLOSED);
} }
@Override @Override
public int getQueryTimeout() throws SQLException { public int getQueryTimeout() throws SQLException {
if (isClosed())
throw new SQLException(STATEMENT_CLOSED);
return 0; return 0;
} }
@Override @Override
public void setQueryTimeout(int seconds) throws SQLException { public void setQueryTimeout(int seconds) throws SQLException {
if (isClosed())
throw new SQLException(STATEMENT_CLOSED);
if (seconds < 0)
throw new SQLException(TSDBConstants.INVALID_VARIABLES);
} }
@Override @Override
public void cancel() throws SQLException { public void cancel() throws SQLException {
throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
} }
@Override @Override
public SQLWarning getWarnings() throws SQLException { public SQLWarning getWarnings() throws SQLException {
//TODO: getWarnings not Implemented if (isClosed())
throw new SQLException(STATEMENT_CLOSED);
return null; return null;
} }
@Override @Override
public void clearWarnings() throws SQLException { public void clearWarnings() throws SQLException {
// nothing to do
if (isClosed())
throw new SQLException(STATEMENT_CLOSED);
} }
@Override @Override
public void setCursorName(String name) throws SQLException { public void setCursorName(String name) throws SQLException {
if (isClosed())
throw new SQLException(RestfulStatement.STATEMENT_CLOSED);
throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
} }
@Override @Override
...@@ -159,133 +215,181 @@ public class RestfulStatement implements Statement { ...@@ -159,133 +215,181 @@ public class RestfulStatement implements Statement {
//如果执行了use操作应该将当前Statement的catalog设置为新的database //如果执行了use操作应该将当前Statement的catalog设置为新的database
if (SqlSyntaxValidator.isUseSql(sql)) { if (SqlSyntaxValidator.isUseSql(sql)) {
this.database = sql.trim().replace("use", "").trim(); this.database = sql.trim().replace("use", "").trim();
this.conn.setCatalog(this.database);
} }
if (this.database == null) if (this.database == null)
throw new SQLException("Database not specified or available"); throw new SQLException("Database not specified or available");
final String url = "http://" + conn.getHost() + ":" + conn.getPort() + "/rest/sql"; if (SqlSyntaxValidator.isSelectSql(sql)) {
// use database executeQuery(sql);
HttpClientPoolUtil.execute(url, "use " + conn.getDatabase()); } else if (SqlSyntaxValidator.isShowSql(sql) || SqlSyntaxValidator.isDescribeSql(sql)) {
// execute sql final String url = "http://" + conn.getHost().trim() + ":" + conn.getPort() + "/rest/sql";
String result = HttpClientPoolUtil.execute(url, sql); if (!SqlSyntaxValidator.isShowDatabaseSql(sql)) {
// parse result HttpClientPoolUtil.execute(url, "use " + conn.getDatabase());
JSONObject jsonObject = JSON.parseObject(result); }
if (jsonObject.getString("status").equals("error")) { String result = HttpClientPoolUtil.execute(url, sql);
throw new SQLException(TSDBConstants.WrapErrMsg("SQL execution error: " + JSONObject resultJson = JSON.parseObject(result);
jsonObject.getString("desc") + "\n" + if (resultJson.getString("status").equals("error")) {
"error code: " + jsonObject.getString("code"))); throw new SQLException(TSDBConstants.WrapErrMsg("SQL execution error: " + resultJson.getString("desc") + "\n" + "error code: " + resultJson.getString("code")));
}
this.resultSet = new RestfulResultSet(database, this, resultJson);
} else {
executeUpdate(sql);
} }
return true; return true;
} }
@Override @Override
public ResultSet getResultSet() throws SQLException { public ResultSet getResultSet() throws SQLException {
return null; if (isClosed())
throw new SQLException(STATEMENT_CLOSED);
return resultSet;
} }
@Override @Override
public int getUpdateCount() throws SQLException { public int getUpdateCount() throws SQLException {
return 0; if (isClosed()) {
throw new SQLException("Invalid method call on a closed statement.");
}
return this.affectedRows;
} }
@Override @Override
public boolean getMoreResults() throws SQLException { public boolean getMoreResults() throws SQLException {
return false; return getMoreResults(CLOSE_CURRENT_RESULT);
} }
@Override @Override
public void setFetchDirection(int direction) throws SQLException { public void setFetchDirection(int direction) throws SQLException {
if (direction != ResultSet.FETCH_FORWARD && direction != ResultSet.FETCH_REVERSE && direction != ResultSet.FETCH_UNKNOWN)
throw new SQLException(TSDBConstants.INVALID_VARIABLES);
this.resultSet.setFetchDirection(direction);
} }
@Override @Override
public int getFetchDirection() throws SQLException { public int getFetchDirection() throws SQLException {
return 0; return this.resultSet.getFetchDirection();
} }
@Override @Override
public void setFetchSize(int rows) throws SQLException { public void setFetchSize(int rows) throws SQLException {
if (isClosed())
throw new SQLException(STATEMENT_CLOSED);
if (rows < 0)
throw new SQLException(TSDBConstants.INVALID_VARIABLES);
//nothing to do
} }
@Override @Override
public int getFetchSize() throws SQLException { public int getFetchSize() throws SQLException {
if (isClosed())
throw new SQLException(STATEMENT_CLOSED);
return 0; return 0;
} }
@Override @Override
public int getResultSetConcurrency() throws SQLException { public int getResultSetConcurrency() throws SQLException {
return 0; if (isClosed())
throw new SQLException(STATEMENT_CLOSED);
return this.resultSet.getConcurrency();
} }
@Override @Override
public int getResultSetType() throws SQLException { public int getResultSetType() throws SQLException {
return 0; if (isClosed())
throw new SQLException(STATEMENT_CLOSED);
return this.resultSet.getType();
} }
@Override @Override
public void addBatch(String sql) throws SQLException { public void addBatch(String sql) throws SQLException {
if (isClosed())
throw new SQLException(STATEMENT_CLOSED);
//TODO:
} }
@Override @Override
public void clearBatch() throws SQLException { public void clearBatch() throws SQLException {
//TODO:
} }
@Override @Override
public int[] executeBatch() throws SQLException { public int[] executeBatch() throws SQLException {
//TODO:
return new int[0]; return new int[0];
} }
@Override @Override
public Connection getConnection() throws SQLException { public Connection getConnection() throws SQLException {
return null; if (isClosed())
throw new SQLException(STATEMENT_CLOSED);
return this.conn;
} }
@Override @Override
public boolean getMoreResults(int current) throws SQLException { public boolean getMoreResults(int current) throws SQLException {
if (isClosed())
throw new SQLException(STATEMENT_CLOSED);
if (resultSet == null)
return false;
// switch (current) {
// case CLOSE_CURRENT_RESULT:
// resultSet.close();
// break;
// case KEEP_CURRENT_RESULT:
// break;
// case CLOSE_ALL_RESULTS:
// resultSet.close();
// break;
// default:
// throw new SQLException(TSDBConstants.INVALID_VARIABLES);
// }
// return next;
return false; return false;
} }
@Override @Override
public ResultSet getGeneratedKeys() throws SQLException { public ResultSet getGeneratedKeys() throws SQLException {
return null; throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
} }
@Override @Override
public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException { public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
return 0; throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
} }
@Override @Override
public int executeUpdate(String sql, int[] columnIndexes) throws SQLException { public int executeUpdate(String sql, int[] columnIndexes) throws SQLException {
return 0; throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
} }
@Override @Override
public int executeUpdate(String sql, String[] columnNames) throws SQLException { public int executeUpdate(String sql, String[] columnNames) throws SQLException {
return 0; throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
} }
@Override @Override
public boolean execute(String sql, int autoGeneratedKeys) throws SQLException { public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
return false; throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
} }
@Override @Override
public boolean execute(String sql, int[] columnIndexes) throws SQLException { public boolean execute(String sql, int[] columnIndexes) throws SQLException {
return false; throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
} }
@Override @Override
public boolean execute(String sql, String[] columnNames) throws SQLException { public boolean execute(String sql, String[] columnNames) throws SQLException {
return false; throw new SQLFeatureNotSupportedException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
} }
@Override @Override
public int getResultSetHoldability() throws SQLException { public int getResultSetHoldability() throws SQLException {
return 0; if (isClosed())
throw new SQLException(STATEMENT_CLOSED);
return this.resultSet.getHoldability();
} }
@Override @Override
...@@ -295,22 +399,30 @@ public class RestfulStatement implements Statement { ...@@ -295,22 +399,30 @@ public class RestfulStatement implements Statement {
@Override @Override
public void setPoolable(boolean poolable) throws SQLException { public void setPoolable(boolean poolable) throws SQLException {
if (isClosed())
throw new SQLException(STATEMENT_CLOSED);
//nothing to do
} }
@Override @Override
public boolean isPoolable() throws SQLException { public boolean isPoolable() throws SQLException {
if (isClosed())
throw new SQLException(STATEMENT_CLOSED);
return false; return false;
} }
@Override @Override
public void closeOnCompletion() throws SQLException { public void closeOnCompletion() throws SQLException {
if (isClosed())
throw new SQLException(STATEMENT_CLOSED);
this.closeOnCompletion = true;
} }
@Override @Override
public boolean isCloseOnCompletion() throws SQLException { public boolean isCloseOnCompletion() throws SQLException {
return false; if (isClosed())
throw new SQLException(STATEMENT_CLOSED);
return this.closeOnCompletion;
} }
@Override @Override
......
...@@ -17,6 +17,8 @@ import org.apache.http.protocol.HTTP; ...@@ -17,6 +17,8 @@ import org.apache.http.protocol.HTTP;
import org.apache.http.protocol.HttpContext; import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils; import org.apache.http.util.EntityUtils;
import java.nio.charset.Charset;
public class HttpClientPoolUtil { public class HttpClientPoolUtil {
public static PoolingHttpClientConnectionManager cm = null; public static PoolingHttpClientConnectionManager cm = null;
...@@ -94,7 +96,9 @@ public class HttpClientPoolUtil { ...@@ -94,7 +96,9 @@ public class HttpClientPoolUtil {
initPools(); initPools();
} }
method = (HttpEntityEnclosingRequestBase) getRequest(uri, HttpPost.METHOD_NAME, DEFAULT_CONTENT_TYPE, 0); method = (HttpEntityEnclosingRequestBase) getRequest(uri, HttpPost.METHOD_NAME, DEFAULT_CONTENT_TYPE, 0);
method.setEntity(new StringEntity(data)); method.setHeader("Authorization", "Basic cm9vdDp0YW9zZGF0YQ==");
method.setHeader("Content-Type", "text/plain");
method.setEntity(new StringEntity(data, Charset.forName("UTF-8")));
HttpContext context = HttpClientContext.create(); HttpContext context = HttpClientContext.create();
CloseableHttpResponse httpResponse = httpClient.execute(method, context); CloseableHttpResponse httpResponse = httpClient.execute(method, context);
httpEntity = httpResponse.getEntity(); httpEntity = httpResponse.getEntity();
...@@ -105,26 +109,13 @@ public class HttpClientPoolUtil { ...@@ -105,26 +109,13 @@ public class HttpClientPoolUtil {
if (method != null) { if (method != null) {
method.abort(); method.abort();
} }
// e.printStackTrace(); new Exception("execute post request exception, url:" + uri + ", exception:" + e.toString() + ", cost time(ms):" + (System.currentTimeMillis() - startTime)).printStackTrace();
// logger.error("execute post request exception, url:" + uri + ", exception:" + e.toString()
// + ", cost time(ms):" + (System.currentTimeMillis() - startTime));
new Exception("execute post request exception, url:"
+ uri + ", exception:" + e.toString() +
", cost time(ms):" + (System.currentTimeMillis() - startTime))
.printStackTrace();
} finally { } finally {
if (httpEntity != null) { if (httpEntity != null) {
try { try {
EntityUtils.consumeQuietly(httpEntity); EntityUtils.consumeQuietly(httpEntity);
} catch (Exception e) { } catch (Exception e) {
// e.printStackTrace(); new Exception("close response exception, url:" + uri + ", exception:" + e.toString() + ", cost time(ms):" + (System.currentTimeMillis() - startTime)).printStackTrace();
// logger.error("close response exception, url:" + uri + ", exception:" + e.toString()
// + ", cost time(ms):" + (System.currentTimeMillis() - startTime));
new Exception(
"close response exception, url:" + uri +
", exception:" + e.toString()
+ ", cost time(ms):" + (System.currentTimeMillis() - startTime))
.printStackTrace();
} }
} }
} }
......
...@@ -15,14 +15,12 @@ ...@@ -15,14 +15,12 @@
package com.taosdata.jdbc.utils; package com.taosdata.jdbc.utils;
import com.taosdata.jdbc.TSDBConnection; import com.taosdata.jdbc.TSDBConnection;
import com.taosdata.jdbc.TSDBJNIConnector;
import java.sql.Connection; import java.sql.Connection;
import java.sql.SQLException;
public class SqlSyntaxValidator { public class SqlSyntaxValidator {
private static final String[] updateSQL = {"insert", "update", "delete", "create", "alter", "drop", "show", "describe", "use"}; private static final String[] updateSQL = {"insert", "update", "delete", "create", "alter", "drop", "show", "describe", "use", "import"};
private static final String[] querySQL = {"select"}; private static final String[] querySQL = {"select"};
private TSDBConnection tsdbConnection; private TSDBConnection tsdbConnection;
...@@ -31,22 +29,6 @@ public class SqlSyntaxValidator { ...@@ -31,22 +29,6 @@ public class SqlSyntaxValidator {
this.tsdbConnection = (TSDBConnection) connection; this.tsdbConnection = (TSDBConnection) connection;
} }
public boolean validateSqlSyntax(String sql) throws SQLException {
boolean res = false;
if (tsdbConnection == null || tsdbConnection.isClosed()) {
throw new SQLException("invalid connection");
} else {
TSDBJNIConnector jniConnector = tsdbConnection.getConnection();
if (jniConnector == null) {
throw new SQLException("jniConnector is null");
} else {
res = jniConnector.validateCreateTableSql(sql);
}
}
return res;
}
public static boolean isValidForExecuteUpdate(String sql) { public static boolean isValidForExecuteUpdate(String sql) {
for (String prefix : updateSQL) { for (String prefix : updateSQL) {
if (sql.trim().toLowerCase().startsWith(prefix)) if (sql.trim().toLowerCase().startsWith(prefix))
...@@ -56,18 +38,28 @@ public class SqlSyntaxValidator { ...@@ -56,18 +38,28 @@ public class SqlSyntaxValidator {
} }
public static boolean isUseSql(String sql) { public static boolean isUseSql(String sql) {
return sql.trim().toLowerCase().startsWith(updateSQL[8]) || sql.trim().toLowerCase().matches("create\\s*database.*") || sql.toLowerCase().toLowerCase().matches("drop\\s*database.*"); return sql.trim().toLowerCase().startsWith("use") || sql.trim().toLowerCase().matches("create\\s*database.*") || sql.toLowerCase().toLowerCase().matches("drop\\s*database.*");
}
public static boolean isShowSql(String sql) {
return sql.trim().toLowerCase().startsWith("show");
} }
public static boolean isUpdateSql(String sql) { public static boolean isDescribeSql(String sql) {
return sql.trim().toLowerCase().startsWith(updateSQL[1]); return sql.trim().toLowerCase().startsWith("describe");
} }
public static boolean isInsertSql(String sql) { public static boolean isInsertSql(String sql) {
return sql.trim().toLowerCase().startsWith(updateSQL[0]); return sql.trim().toLowerCase().startsWith("insert") || sql.trim().toLowerCase().startsWith("import");
} }
public static boolean isSelectSql(String sql) { public static boolean isSelectSql(String sql) {
return sql.trim().toLowerCase().startsWith(querySQL[0]); return sql.trim().toLowerCase().startsWith("select");
}
public static boolean isShowDatabaseSql(String sql) {
return sql.trim().toLowerCase().matches("show\\s*databases");
} }
} }
package com.taosdata.jdbc.rs; package com.taosdata.jdbc.rs;
import org.junit.*; import org.junit.*;
import org.junit.runners.MethodSorters; import org.junit.runners.MethodSorters;
......
...@@ -188,8 +188,8 @@ void dnodeReprocessMWriteMsg(void *pMsg) { ...@@ -188,8 +188,8 @@ void dnodeReprocessMWriteMsg(void *pMsg) {
++pWrite->pBatchMasterMsg->received; ++pWrite->pBatchMasterMsg->received;
if (pWrite->pBatchMasterMsg->successed + pWrite->pBatchMasterMsg->received if (pWrite->pBatchMasterMsg->successed + pWrite->pBatchMasterMsg->received
>= pWrite->pBatchMasterMsg->expected) { >= pWrite->pBatchMasterMsg->expected) {
dnodeSendRedirectMsg(&pWrite->rpcMsg, true); dnodeSendRedirectMsg(&pWrite->pBatchMasterMsg->rpcMsg, true);
dnodeFreeMWriteMsg(pWrite); dnodeFreeMWriteMsg(pWrite->pBatchMasterMsg);
} }
mnodeDestroySubMsg(pWrite); mnodeDestroySubMsg(pWrite);
......
...@@ -60,7 +60,7 @@ int32_t dnodeInitServer() { ...@@ -60,7 +60,7 @@ int32_t dnodeInitServer() {
rpcInit.label = "DND-S"; rpcInit.label = "DND-S";
rpcInit.numOfThreads = 1; rpcInit.numOfThreads = 1;
rpcInit.cfp = dnodeProcessReqMsgFromDnode; rpcInit.cfp = dnodeProcessReqMsgFromDnode;
rpcInit.sessions = TSDB_MAX_VNODES; rpcInit.sessions = TSDB_MAX_VNODES << 4;
rpcInit.connType = TAOS_CONN_SERVER; rpcInit.connType = TAOS_CONN_SERVER;
rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.idleTime = tsShellActivityTimer * 1000;
...@@ -123,7 +123,7 @@ int32_t dnodeInitClient() { ...@@ -123,7 +123,7 @@ int32_t dnodeInitClient() {
rpcInit.label = "DND-C"; rpcInit.label = "DND-C";
rpcInit.numOfThreads = 1; rpcInit.numOfThreads = 1;
rpcInit.cfp = dnodeProcessRspFromDnode; rpcInit.cfp = dnodeProcessRspFromDnode;
rpcInit.sessions = TSDB_MAX_VNODES; rpcInit.sessions = TSDB_MAX_VNODES << 4;
rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.user = "t"; rpcInit.user = "t";
......
...@@ -6,12 +6,12 @@ ...@@ -6,12 +6,12 @@
"user": "root", "user": "root",
"password": "taosdata", "password": "taosdata",
"databases": "db01", "databases": "db01",
"super_table_query": "specified_table_query":
{"rate":1, "concurrent":1, {"query_interval":1, "concurrent":1,
"sqls": [{"sql": "select count(*) from stb01", "result": "./query_res0.txt"}] "sqls": [{"sql": "select count(*) from stb01", "result": "./query_res0.txt"}]
}, },
"sub_table_query": "super_table_query":
{"stblname": "stb01", "rate":1, "threads":1, {"stblname": "stb01", "query_interval":1, "threads":1,
"sqls": [{"sql": "select count(*) from xxxx", "result": "./query_res1.txt"}] "sqls": [{"sql": "select count(*) from xxxx", "result": "./query_res1.txt"}]
} }
} }
此差异已折叠。
...@@ -311,6 +311,11 @@ static int32_t mnodeCheckDbCfg(SDbCfg *pCfg) { ...@@ -311,6 +311,11 @@ static int32_t mnodeCheckDbCfg(SDbCfg *pCfg) {
return TSDB_CODE_MND_INVALID_DB_OPTION; return TSDB_CODE_MND_INVALID_DB_OPTION;
} }
if (pCfg->replications > mnodeGetDnodesNum()) {
mError("no enough dnode to config replica: %d, #dnodes: %d", pCfg->replications, mnodeGetDnodesNum());
return TSDB_CODE_MND_INVALID_DB_OPTION;
}
if (pCfg->quorum < TSDB_MIN_DB_REPLICA_OPTION || pCfg->quorum > TSDB_MAX_DB_REPLICA_OPTION) { if (pCfg->quorum < TSDB_MIN_DB_REPLICA_OPTION || pCfg->quorum > TSDB_MAX_DB_REPLICA_OPTION) {
mError("invalid db option quorum:%d valid range: [%d, %d]", pCfg->quorum, TSDB_MIN_DB_REPLICA_OPTION, mError("invalid db option quorum:%d valid range: [%d, %d]", pCfg->quorum, TSDB_MIN_DB_REPLICA_OPTION,
TSDB_MAX_DB_REPLICA_OPTION); TSDB_MAX_DB_REPLICA_OPTION);
......
...@@ -827,21 +827,21 @@ static int32_t mnodeProcessBatchCreateTableMsg(SMnodeMsg *pMsg) { ...@@ -827,21 +827,21 @@ static int32_t mnodeProcessBatchCreateTableMsg(SMnodeMsg *pMsg) {
SCreateTableMsg *pCreateTable = (SCreateTableMsg*) ((char*) pCreate + sizeof(SCMCreateTableMsg)); SCreateTableMsg *pCreateTable = (SCreateTableMsg*) ((char*) pCreate + sizeof(SCMCreateTableMsg));
int32_t code = mnodeValidateCreateTableMsg(pCreateTable, pMsg); int32_t code = mnodeValidateCreateTableMsg(pCreateTable, pMsg);
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_MND_TABLE_ALREADY_EXIST) { if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_MND_TABLE_ALREADY_EXIST) {
++pMsg->pBatchMasterMsg->successed; ++pMsg->pBatchMasterMsg->successed;
mnodeDestroySubMsg(pMsg); mnodeDestroySubMsg(pMsg);
} } else if (code == TSDB_CODE_MND_ACTION_NEED_REPROCESSED) {
return code;
if (code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { } else if (code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mnodeDestroySubMsg(pMsg); ++pMsg->pBatchMasterMsg->received;
return code; mnodeDestroySubMsg(pMsg);
} }
if (pMsg->pBatchMasterMsg->successed + pMsg->pBatchMasterMsg->received if (pMsg->pBatchMasterMsg->successed + pMsg->pBatchMasterMsg->received
>= pMsg->pBatchMasterMsg->expected) { >= pMsg->pBatchMasterMsg->expected) {
return code; dnodeSendRpcMWriteRsp(pMsg->pBatchMasterMsg, TSDB_CODE_SUCCESS);
} else {
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} else { // batch master replay, reprocess the whole batch } else { // batch master replay, reprocess the whole batch
assert(0); assert(0);
} }
......
此差异已折叠。
...@@ -1567,6 +1567,7 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { ...@@ -1567,6 +1567,7 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
// for response, if code is auth failure, it shall bypass the auth process // for response, if code is auth failure, it shall bypass the auth process
code = htonl(pHead->code); code = htonl(pHead->code);
if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE || if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE ||
code == TSDB_CODE_RPC_INVALID_VERSION ||
code == TSDB_CODE_RPC_AUTH_REQUIRED || code == TSDB_CODE_MND_INVALID_USER || code == TSDB_CODE_RPC_NOT_READY) { code == TSDB_CODE_RPC_AUTH_REQUIRED || code == TSDB_CODE_MND_INVALID_USER || code == TSDB_CODE_RPC_NOT_READY) {
pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen); pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
// tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code); // tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code);
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册