提交 71cb1477 编写于 作者: W wenzhouwww

Merge branch 'develop' of github.com:taosdata/TDengine into develop

......@@ -331,49 +331,238 @@ JDBC连接器可能报错的错误码包括3种:JDBC driver本身的报错(
### <a class="anchor" id="stmt-java"></a>通过参数绑定写入数据
从 2.1.2.0 版本开始,TDengine 的 **JDBC-JNI** 实现大幅改进了参数绑定方式对数据写入(INSERT)场景的支持。采用这种方式写入数据时,能避免 SQL 语法解析的资源消耗,从而在很多情况下显著提升写入性能。(注意:**JDBC-RESTful** 实现并不提供参数绑定这种使用方式。)
从 2.1.2.0 版本开始,TDengine 的 JDBC-JNI 实现大幅改进了参数绑定方式对数据写入(INSERT)场景的支持。采用这种方式写入数据时,能避免 SQL 语法解析的资源消耗,从而在很多情况下显著提升写入性能。
注意:
* JDBC-RESTful 实现并不提供参数绑定这种使用方式
* 以下示例代码基于taos-jdbcdriver-2.0.36
* binary类型数据需要调用setString方法,nchar类型数据需要调用setNString方法
* setString 和 setNString 都要求用户在 size 参数里声明表定义中对应列的列宽
示例代码:
```java
Random r = new Random();
// INSERT 语句中,VALUES 部分允许指定具体的数据列;如果采取自动建表,则 TAGS 部分需要设定全部 TAGS 列的参数值:
TSDBPreparedStatement s = (TSDBPreparedStatement) conn.prepareStatement("insert into ? using weather_test tags (?, ?) (ts, c1, c2) values(?, ?, ?)");
// 设定数据表名:
s.setTableName("w1");
// 设定 TAGS 取值:
s.setTagInt(0, r.nextInt(10));
s.setTagString(1, "Beijing");
int numOfRows = 10;
// VALUES 部分以逐列的方式进行设置:
ArrayList<Long> ts = new ArrayList<>();
for (int i = 0; i < numOfRows; i++){
ts.add(System.currentTimeMillis() + i);
}
s.setTimestamp(0, ts);
ArrayList<Integer> s1 = new ArrayList<>();
for (int i = 0; i < numOfRows; i++){
s1.add(r.nextInt(100));
}
s.setInt(1, s1);
ArrayList<String> s2 = new ArrayList<>();
for (int i = 0; i < numOfRows; i++){
s2.add("test" + r.nextInt(100));
public class ParameterBindingDemo {
private static final String host = "127.0.0.1";
private static final Random random = new Random(System.currentTimeMillis());
private static final int BINARY_COLUMN_SIZE = 20;
private static final String[] schemaList = {
"create table stable1(ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint) tags(t1 tinyint, t2 smallint, t3 int, t4 bigint)",
"create table stable2(ts timestamp, f1 float, f2 double) tags(t1 float, t2 double)",
"create table stable3(ts timestamp, f1 bool) tags(t1 bool)",
"create table stable4(ts timestamp, f1 binary(" + BINARY_COLUMN_SIZE + ")) tags(t1 binary(" + BINARY_COLUMN_SIZE + "))",
"create table stable5(ts timestamp, f1 nchar(" + BINARY_COLUMN_SIZE + ")) tags(t1 nchar(" + BINARY_COLUMN_SIZE + "))"
};
private static final int numOfSubTable = 10, numOfRow = 10;
public static void main(String[] args) throws SQLException {
String jdbcUrl = "jdbc:TAOS://" + host + ":6030/";
Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata");
init(conn);
bindInteger(conn);
bindFloat(conn);
bindBoolean(conn);
bindBytes(conn);
bindString(conn);
conn.close();
}
private static void init(Connection conn) throws SQLException {
try (Statement stmt = conn.createStatement()) {
stmt.execute("drop database if exists test_parabind");
stmt.execute("create database if not exists test_parabind");
stmt.execute("use test_parabind");
for (int i = 0; i < schemaList.length; i++) {
stmt.execute(schemaList[i]);
}
}
}
private static void bindInteger(Connection conn) throws SQLException {
String sql = "insert into ? using stable1 tags(?,?,?,?) values(?,?,?,?,?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t1_" + i);
// set tags
pstmt.setTagByte(0, Byte.parseByte(Integer.toString(random.nextInt(Byte.MAX_VALUE))));
pstmt.setTagShort(1, Short.parseShort(Integer.toString(random.nextInt(Short.MAX_VALUE))));
pstmt.setTagInt(2, random.nextInt(Integer.MAX_VALUE));
pstmt.setTagLong(3, random.nextLong());
// set columns
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
ArrayList<Byte> f1List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f1List.add(Byte.parseByte(Integer.toString(random.nextInt(Byte.MAX_VALUE))));
pstmt.setByte(1, f1List);
ArrayList<Short> f2List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f2List.add(Short.parseShort(Integer.toString(random.nextInt(Short.MAX_VALUE))));
pstmt.setShort(2, f2List);
ArrayList<Integer> f3List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f3List.add(random.nextInt(Integer.MAX_VALUE));
pstmt.setInt(3, f3List);
ArrayList<Long> f4List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f4List.add(random.nextLong());
pstmt.setLong(4, f4List);
// add column
pstmt.columnDataAddBatch();
}
// execute column
pstmt.columnDataExecuteBatch();
}
}
private static void bindFloat(Connection conn) throws SQLException {
String sql = "insert into ? using stable2 tags(?,?) values(?,?,?)";
TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class);
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t2_" + i);
// set tags
pstmt.setTagFloat(0, random.nextFloat());
pstmt.setTagDouble(1, random.nextDouble());
// set columns
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
ArrayList<Float> f1List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f1List.add(random.nextFloat());
pstmt.setFloat(1, f1List);
ArrayList<Double> f2List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f2List.add(random.nextDouble());
pstmt.setDouble(2, f2List);
// add column
pstmt.columnDataAddBatch();
}
// execute
pstmt.columnDataExecuteBatch();
// close if no try-with-catch statement is used
pstmt.close();
}
private static void bindBoolean(Connection conn) throws SQLException {
String sql = "insert into ? using stable3 tags(?) values(?,?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t3_" + i);
// set tags
pstmt.setTagBoolean(0, random.nextBoolean());
// set columns
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
ArrayList<Boolean> f1List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f1List.add(random.nextBoolean());
pstmt.setBoolean(1, f1List);
// add column
pstmt.columnDataAddBatch();
}
// execute
pstmt.columnDataExecuteBatch();
}
}
private static void bindBytes(Connection conn) throws SQLException {
String sql = "insert into ? using stable4 tags(?) values(?,?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t4_" + i);
// set tags
pstmt.setTagString(0, new String("abc"));
// set columns
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
ArrayList<String> f1List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++) {
f1List.add(new String("abc"));
}
pstmt.setString(1, f1List, BINARY_COLUMN_SIZE);
// add column
pstmt.columnDataAddBatch();
}
// execute
pstmt.columnDataExecuteBatch();
}
}
private static void bindString(Connection conn) throws SQLException {
String sql = "insert into ? using stable5 tags(?) values(?,?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t5_" + i);
// set tags
pstmt.setTagNString(0, "北京-abc");
// set columns
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
ArrayList<String> f1List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++) {
f1List.add("北京-abc");
}
pstmt.setNString(1, f1List, BINARY_COLUMN_SIZE);
// add column
pstmt.columnDataAddBatch();
}
// execute
pstmt.columnDataExecuteBatch();
}
}
}
s.setString(2, s2, 10);
// AddBatch 之后,缓存并未清空。为避免混乱,并不推荐在 ExecuteBatch 之前再次绑定新一批的数据:
s.columnDataAddBatch();
// 执行绑定数据后的语句:
s.columnDataExecuteBatch();
// 执行语句后清空缓存。在清空之后,可以复用当前的对象,绑定新的一批数据(可以是新表名、新 TAGS 值、新 VALUES 值):
s.columnDataClearBatch();
// 执行完毕,释放资源:
s.columnDataCloseBatch();
```
用于设定 TAGS 取值的方法总共有:
......@@ -405,8 +594,6 @@ public void setString(int columnIndex, ArrayList<String> list, int size) throws
public void setNString(int columnIndex, ArrayList<String> list, int size) throws SQLException
```
其中 setString 和 setNString 都要求用户在 size 参数里声明表定义中对应列的列宽。
## <a class="anchor" id="subscribe"></a>订阅
### 创建
......
......@@ -310,46 +310,239 @@ The Java connector may report three types of error codes: JDBC Driver (error cod
- https://github.com/taosdata/TDengine/blob/develop/src/inc/taoserror.h
### Write data through parameter binding
Starting with version 2.1.2.0, TDengine's JDBC-JNI implementation significantly improves support for data write (INSERT) scenarios with Parameter-Binding. When writing data in this way, you can avoid the resource consumption of SQL parsing, which can significantly improve write performance in many cases.
Note:
* Jdbc-restful implementations do not provide Parameter-Binding
* The following sample code is based on taos-jdbcdriver-2.0.36
* use setString to bind BINARY data, and use setNString to bind NCHAR data
* Both setString and setNString require the user to declare the column width of the corresponding column in the table definition in the size parameter
Since version 2.1.2.0, TDengine's JDBC-JNI implementation has significantly improved parameter binding support for data write (INSERT) scenarios. Data can be written in the following way, avoiding SQL parsing and significantly improving the write performance.(**Note**: parameter binding is not supported in JDBC-RESTful)
Sample Code:
```java
Statement stmt = conn.createStatement();
Random r = new Random();
// In the INSERT statement, the VALUES clause allows you to specify a specific column; If automatic table creation is adopted, the TAGS clause needs to set the parameter values of all TAGS columns
TSDBPreparedStatement s = (TSDBPreparedStatement) conn.prepareStatement("insert into ? using weather_test tags (?, ?) (ts, c1, c2) values(?, ?, ?)");
s.setTableName("w1");
// set tags
s.setTagInt(0, r.nextInt(10));
s.setTagString(1, "Beijing");
int numOfRows = 10;
// set values
ArrayList<Long> ts = new ArrayList<>();
for (int i = 0; i < numOfRows; i++){
ts.add(System.currentTimeMillis() + i);
}
s.setTimestamp(0, ts);
ArrayList<Integer> s1 = new ArrayList<>();
for (int i = 0; i < numOfRows; i++){
s1.add(r.nextInt(100));
}
s.setInt(1, s1);
ArrayList<String> s2 = new ArrayList<>();
for (int i = 0; i < numOfRows; i++){
s2.add("test" + r.nextInt(100));
public class ParameterBindingDemo {
private static final String host = "127.0.0.1";
private static final Random random = new Random(System.currentTimeMillis());
private static final int BINARY_COLUMN_SIZE = 20;
private static final String[] schemaList = {
"create table stable1(ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint) tags(t1 tinyint, t2 smallint, t3 int, t4 bigint)",
"create table stable2(ts timestamp, f1 float, f2 double) tags(t1 float, t2 double)",
"create table stable3(ts timestamp, f1 bool) tags(t1 bool)",
"create table stable4(ts timestamp, f1 binary(" + BINARY_COLUMN_SIZE + ")) tags(t1 binary(" + BINARY_COLUMN_SIZE + "))",
"create table stable5(ts timestamp, f1 nchar(" + BINARY_COLUMN_SIZE + ")) tags(t1 nchar(" + BINARY_COLUMN_SIZE + "))"
};
private static final int numOfSubTable = 10, numOfRow = 10;
public static void main(String[] args) throws SQLException {
String jdbcUrl = "jdbc:TAOS://" + host + ":6030/";
Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata");
init(conn);
bindInteger(conn);
bindFloat(conn);
bindBoolean(conn);
bindBytes(conn);
bindString(conn);
conn.close();
}
private static void init(Connection conn) throws SQLException {
try (Statement stmt = conn.createStatement()) {
stmt.execute("drop database if exists test_parabind");
stmt.execute("create database if not exists test_parabind");
stmt.execute("use test_parabind");
for (int i = 0; i < schemaList.length; i++) {
stmt.execute(schemaList[i]);
}
}
}
private static void bindInteger(Connection conn) throws SQLException {
String sql = "insert into ? using stable1 tags(?,?,?,?) values(?,?,?,?,?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t1_" + i);
// set tags
pstmt.setTagByte(0, Byte.parseByte(Integer.toString(random.nextInt(Byte.MAX_VALUE))));
pstmt.setTagShort(1, Short.parseShort(Integer.toString(random.nextInt(Short.MAX_VALUE))));
pstmt.setTagInt(2, random.nextInt(Integer.MAX_VALUE));
pstmt.setTagLong(3, random.nextLong());
// set columns
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
ArrayList<Byte> f1List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f1List.add(Byte.parseByte(Integer.toString(random.nextInt(Byte.MAX_VALUE))));
pstmt.setByte(1, f1List);
ArrayList<Short> f2List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f2List.add(Short.parseShort(Integer.toString(random.nextInt(Short.MAX_VALUE))));
pstmt.setShort(2, f2List);
ArrayList<Integer> f3List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f3List.add(random.nextInt(Integer.MAX_VALUE));
pstmt.setInt(3, f3List);
ArrayList<Long> f4List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f4List.add(random.nextLong());
pstmt.setLong(4, f4List);
// add column
pstmt.columnDataAddBatch();
}
// execute column
pstmt.columnDataExecuteBatch();
}
}
private static void bindFloat(Connection conn) throws SQLException {
String sql = "insert into ? using stable2 tags(?,?) values(?,?,?)";
TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class);
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t2_" + i);
// set tags
pstmt.setTagFloat(0, random.nextFloat());
pstmt.setTagDouble(1, random.nextDouble());
// set columns
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
ArrayList<Float> f1List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f1List.add(random.nextFloat());
pstmt.setFloat(1, f1List);
ArrayList<Double> f2List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f2List.add(random.nextDouble());
pstmt.setDouble(2, f2List);
// add column
pstmt.columnDataAddBatch();
}
// execute
pstmt.columnDataExecuteBatch();
// close if no try-with-catch statement is used
pstmt.close();
}
private static void bindBoolean(Connection conn) throws SQLException {
String sql = "insert into ? using stable3 tags(?) values(?,?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t3_" + i);
// set tags
pstmt.setTagBoolean(0, random.nextBoolean());
// set columns
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
ArrayList<Boolean> f1List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f1List.add(random.nextBoolean());
pstmt.setBoolean(1, f1List);
// add column
pstmt.columnDataAddBatch();
}
// execute
pstmt.columnDataExecuteBatch();
}
}
private static void bindBytes(Connection conn) throws SQLException {
String sql = "insert into ? using stable4 tags(?) values(?,?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t4_" + i);
// set tags
pstmt.setTagString(0, new String("abc"));
// set columns
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
ArrayList<String> f1List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++) {
f1List.add(new String("abc"));
}
pstmt.setString(1, f1List, BINARY_COLUMN_SIZE);
// add column
pstmt.columnDataAddBatch();
}
// execute
pstmt.columnDataExecuteBatch();
}
}
private static void bindString(Connection conn) throws SQLException {
String sql = "insert into ? using stable5 tags(?) values(?,?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t5_" + i);
// set tags
pstmt.setTagNString(0, "北京-abc");
// set columns
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
ArrayList<String> f1List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++) {
f1List.add("北京-abc");
}
pstmt.setNString(1, f1List, BINARY_COLUMN_SIZE);
// add column
pstmt.columnDataAddBatch();
}
// execute
pstmt.columnDataExecuteBatch();
}
}
}
s.setString(2, s2, 10);
// The cache is not cleared after AddBatch. Do not bind new data again before ExecuteBatch
s.columnDataAddBatch();
s.columnDataExecuteBatch();
// Clear the cache, after which you can bind new data(including table names, tags, values):
s.columnDataClearBatch();
s.columnDataCloseBatch();
```
The methods used to set tags are:
......@@ -383,8 +576,6 @@ public void setString(int columnIndex, ArrayList<String> list, int size) throws
public void setNString(int columnIndex, ArrayList<String> list, int size) throws SQLException
```
**Note**: Both setString and setNString require the user to declare the column width of the corresponding column in the table definition in the size parameter.
### Data Subscription
#### Subscribe
......
......@@ -143,6 +143,7 @@ static bool validateDebugFlag(int32_t v);
static int32_t checkQueryRangeForFill(SSqlCmd* pCmd, SQueryInfo* pQueryInfo);
static int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo);
static tSqlExpr* extractExprForSTable(SSqlCmd* pCmd, tSqlExpr** pExpr, SQueryInfo* pQueryInfo, int32_t tableIndex);
static void convertWhereStringCharset(tSqlExpr* pRight);
int validateTableName(char *tblName, int len, SStrToken* psTblToken, bool *dbIncluded);
......@@ -4963,25 +4964,23 @@ static int32_t validateJsonTagExpr(tSqlExpr* pExpr, char* msgBuf) {
return invalidOperationMsg(msgBuf, msg3);
if (pLeft->pRight && (pLeft->pRight->value.nLen > TSDB_MAX_JSON_KEY_LEN || pLeft->pRight->value.nLen <= 0))
return invalidOperationMsg(msgBuf, msg2);
if (pRight->tokenId == TK_NULL && pExpr->tokenId == TK_EQ) {
// transform for json->'key'=null
pRight->tokenId = TK_STRING;
pRight->value.nType = TSDB_DATA_TYPE_BINARY;
pRight->value.nLen = INT_BYTES;
pRight->value.pz = calloc(INT_BYTES, 1);
*(uint32_t*)pRight->value.pz = TSDB_DATA_JSON_null;
return TSDB_CODE_SUCCESS;
}
}
if (pRight->value.nType == TSDB_DATA_TYPE_BINARY){ // json value store by nchar, so need to convert from binary to nchar
if(pRight->value.nLen == INT_BYTES && *(uint32_t*)pRight->value.pz == TSDB_DATA_JSON_null){
return TSDB_CODE_SUCCESS;
}
if(pRight->value.nLen == 0){
pRight->value.nType = TSDB_DATA_TYPE_NCHAR;
return TSDB_CODE_SUCCESS;
}
char newData[TSDB_MAX_JSON_TAGS_LEN] = {0};
int len = 0;
if(!taosMbsToUcs4(pRight->value.pz, pRight->value.nLen, newData, TSDB_MAX_JSON_TAGS_LEN, &len)){
tscError("json where condition mbsToUcs4 error");
}
pRight->value.pz = realloc(pRight->value.pz, len);
memcpy(pRight->value.pz, newData, len);
pRight->value.nLen = len;
pRight->value.nType = TSDB_DATA_TYPE_NCHAR;
convertWhereStringCharset(pRight);
}
}
......@@ -5045,6 +5044,34 @@ int32_t handleNeOptr(tSqlExpr** rexpr, tSqlExpr* expr) {
return TSDB_CODE_SUCCESS;
}
void convertWhereStringCharset(tSqlExpr* pRight){
if(pRight->value.nType != TSDB_DATA_TYPE_BINARY || pRight->value.nLen == 0){
return;
}
char *newData = calloc(pRight->value.nLen * TSDB_NCHAR_SIZE, 1);
if(!newData){
tscError("convertWhereStringCharset calloc memory error");
return;
}
int len = 0;
if(!taosMbsToUcs4(pRight->value.pz, pRight->value.nLen, newData, pRight->value.nLen * TSDB_NCHAR_SIZE, &len)){
tscError("nchar in where condition mbsToUcs4 error");
free(newData);
return;
}
char* tmp = realloc(pRight->value.pz, len);
if (!tmp){
tscError("convertWhereStringCharset realloc memory error");
free(newData);
return;
}
pRight->value.pz = tmp;
memcpy(pRight->value.pz, newData, len);
pRight->value.nLen = len;
pRight->value.nType = TSDB_DATA_TYPE_NCHAR;
free(newData);
}
static int32_t handleExprInQueryCond(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SCondExpr* pCondExpr,
int32_t* type, int32_t* tbIdx, int32_t parentOptr, tSqlExpr** columnExpr,
......@@ -5061,14 +5088,6 @@ static int32_t handleExprInQueryCond(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSql
SStrToken* colName = NULL;
if(pLeft->tokenId == TK_ARROW){
colName = &(pLeft->pLeft->columnName);
if (pRight->tokenId == TK_NULL && (*pExpr)->tokenId == TK_EQ) {
// transform for json->'key'=null
pRight->tokenId = TK_STRING;
pRight->value.nType = TSDB_DATA_TYPE_BINARY;
pRight->value.nLen = INT_BYTES;
pRight->value.pz = calloc(INT_BYTES, 1);
*(uint32_t*)pRight->value.pz = TSDB_DATA_JSON_null;
}
}else{
colName = &(pLeft->columnName);
}
......@@ -5105,6 +5124,11 @@ static int32_t handleExprInQueryCond(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSql
}
SSchema* pSchema = tscGetTableColumnSchema(pTableMeta, index.columnIndex);
if (pSchema->type == TSDB_DATA_TYPE_NCHAR){
convertWhereStringCharset(pRight);
}
if (pSchema->type == TSDB_DATA_TYPE_TIMESTAMP && index.columnIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX) { // query on time range
if (!validateJoinExprNode(pCmd, pQueryInfo, *pExpr, &index)) {
return TSDB_CODE_TSC_INVALID_OPERATION;
......
......@@ -5511,18 +5511,20 @@ int parseJsontoTagData(char* json, SKVRowBuilder* kvRowBuilder, char* errMsg, in
if(item->type == cJSON_String){ // add json value format: type|data
char *jsonValue = item->valuestring;
outLen = 0;
char tagVal[TSDB_MAX_JSON_TAGS_LEN] = {0};
char *tagVal = calloc(strlen(jsonValue) * TSDB_NCHAR_SIZE + TSDB_NCHAR_SIZE, 1);
*tagVal = jsonType2DbType(0, item->type); // type
char* tagData = POINTER_SHIFT(tagVal,CHAR_BYTES);
if (!taosMbsToUcs4(jsonValue, strlen(jsonValue), varDataVal(tagData),
TSDB_MAX_JSON_TAGS_LEN - CHAR_BYTES - VARSTR_HEADER_SIZE, &outLen)) {
(int32_t)(strlen(jsonValue) * TSDB_NCHAR_SIZE), &outLen)) {
tscError("json string error:%s|%s", strerror(errno), jsonValue);
retCode = tscSQLSyntaxErrMsg(errMsg, "serizelize json error", NULL);
free(tagVal);
goto end;
}
varDataSetLen(tagData, outLen);
tdAddColToKVRow(kvRowBuilder, jsonIndex++, TSDB_DATA_TYPE_NCHAR, tagVal, true);
free(tagVal);
}else if(item->type == cJSON_Number){
if(!isfinite(item->valuedouble)){
tscError("json value is invalidate");
......
......@@ -4231,7 +4231,6 @@ char* parseTagDatatoJson(void *p){
memset(tagJsonKey, 0, sizeof(tagJsonKey));
memcpy(tagJsonKey, varDataVal(val), varDataLen(val));
}else{ // json value
char tagJsonValue[TSDB_MAX_JSON_TAGS_LEN] = {0};
char* realData = POINTER_SHIFT(val, CHAR_BYTES);
char type = *(char*)val;
if(type == TSDB_DATA_TYPE_BINARY) {
......@@ -4244,14 +4243,16 @@ char* parseTagDatatoJson(void *p){
}
cJSON_AddItemToObject(json, tagJsonKey, value);
}else if(type == TSDB_DATA_TYPE_NCHAR) {
char *tagJsonValue = calloc(varDataLen(realData), 1);
int32_t length = taosUcs4ToMbs(varDataVal(realData), varDataLen(realData), tagJsonValue);
if (length < 0) {
tsdbError("charset:%s to %s. val:%s convert json value failed.", DEFAULT_UNICODE_ENCODEC, tsCharset,
(char*)val);
free(tagJsonValue);
goto end;
}
cJSON* value = cJSON_CreateString(tagJsonValue);
free(tagJsonValue);
if (value == NULL)
{
goto end;
......
......@@ -59,6 +59,7 @@ class TDTestCase:
assert subResult == expectResult , "Queryfile:%s ,result is %s != expect: %s" % args0
def run(self):
tdSql.prepare()
buildPath = self.getBuildPath()
if (buildPath == ""):
tdLog.exit("taosd not found!")
......@@ -66,24 +67,38 @@ class TDTestCase:
tdLog.info("taosd found in %s" % buildPath)
binPath = buildPath+ "/build/bin/"
# clear env
os.system("ps -ef |grep 'taosdemoAllTest/taosdemoTestSupportNanoSubscribe.json' |grep -v 'grep' |awk '{print $2}'|xargs kill -9")
# clear envs
os.system("ps -aux |grep 'taosdemoAllTest/taosdemoTestSupportNanoSubscribe.json' |awk '{print $2}'|xargs kill -9 >/dev/null 2>&1")
os.system("ps -aux |grep 'tools/taosdemoAllTest/NanoTestCase/taosdemoTestNanoDatabaseInsertForSub.json' |awk '{print $2}'|xargs kill -9 >/dev/null 2>&1")
os.system("rm -rf ./subscribe_res*")
os.system("rm -rf ./all_subscribe_res*")
# insert data
os.system("%staosBenchmark -f tools/taosdemoAllTest/NanoTestCase/taosdemoTestNanoDatabaseInsertForSub.json" % binPath)
os.system("nohup %staosBenchmark -f tools/taosdemoAllTest/NanoTestCase/taosdemoTestSupportNanoSubscribe.json &" % binPath)
query_pid = int(subprocess.getstatusoutput('ps aux|grep "taosdemoAllTest/NanoTestCase/taosdemoTestSupportNanoSubscribe.json" |grep -v "grep"|awk \'{print $2}\'')[1])
os.system("nohup %staosBenchmark -f tools/taosdemoAllTest/NanoTestCase/taosdemoTestNanoDatabaseInsertForSub.json & >/dev/null 2>&1" % binPath)
sleep(5)
tdSql.query("select count(*) from subnsdb.stb0")
if tdSql.checkData(0,0,100):
pass
else:
sleep(5)
tdSql.query("select count(*) from subnsdb.stb0") # if records not write done ,sleep and wait records write done!
os.system(" nohup %staosBenchmark -f tools/taosdemoAllTest/NanoTestCase/taosdemoTestSupportNanoSubscribe.json & >/dev/null 2>&1" % binPath)
sleep(5)
if os.path.exists("./subscribe_res0.txt") and os.path.exists("./subscribe_res1.txt") and os.path.exists("./subscribe_res2.txt"):
pass
else:
sleep(5) # make sure query is ok
print('taosBenchmark query done!')
# merge result files
sleep(5)
os.system("cat subscribe_res0.txt* > all_subscribe_res0.txt")
os.system("cat subscribe_res1.txt* > all_subscribe_res1.txt")
os.system("cat subscribe_res2.txt* > all_subscribe_res2.txt")
sleep(5)
# correct subscribeTimes testcase
subTimes0 = self.subTimes("all_subscribe_res0.txt")
......@@ -103,19 +118,18 @@ class TDTestCase:
os.system("cat subscribe_res0.txt* > all_subscribe_res0.txt")
subTimes0 = self.subTimes("all_subscribe_res0.txt")
print("pass")
self.assertCheck("all_subscribe_res0.txt",subTimes0 ,202)
# correct data testcase
os.system("kill -9 %d" % query_pid)
sleep(3)
os.system("rm -rf ./subscribe_res*")
os.system("rm -rf ./all_subscribe*")
os.system("rm -rf ./*.py.sql")
os.system("rm -rf ./nohup*")
os.system("ps -aux |grep 'taosdemoAllTest/taosdemoTestSupportNanoSubscribe.json' |awk '{print $2}'|xargs kill -9 >/dev/null 2>&1")
os.system("ps -aux |grep 'tools/taosdemoAllTest/NanoTestCase/taosdemoTestSupportNanoSubscribe.json' |awk '{print $2}'|xargs kill -9 >/dev/null 2>&1")
os.system("ps -aux |grep 'tools/taosdemoAllTest/NanoTestCase/taosdemoTestNanoDatabaseInsertForSub.json' |awk '{print $2}'|xargs kill -9 >/dev/null 2>&1")
def stop(self):
tdSql.close()
......@@ -123,3 +137,4 @@ class TDTestCase:
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
###################################################################
# Copyright (c) 2020 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
from posixpath import split
import sys
import os
from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self.ts = 1420041600000 # 2015-01-01 00:00:00 this is begin time for first record
self.num = 10
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]
break
return buildPath
def caseDescription(self):
'''
case1 <wenzhouwww>: [TD-11389] :
this test case is an test case for cache error , it will let the cached data obtained by the client that has connected to taosd incorrect,
root cause : table schema is changed, tag hostname size is increased through schema-less insertion. The schema cache of client taos is not refreshed.
'''
return
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root)-len("/build/bin")]
break
return buildPath
def getcfgPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
print(selfPath)
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
cfgPath = projPath + "/sim/dnode1/cfg "
return cfgPath
def run(self):
tdSql.prepare()
tdSql.execute("create database if not exists testdb keep 36500;")
tdSql.execute("use testdb;")
tdSql.execute("create stable st (ts timestamp , id int , value double) tags(hostname binary(10) ,ind int);")
for i in range(self.num):
tdSql.execute("insert into sub_%s using st tags('host_%s' , %d) values (%d , %d , %f );"%(str(i),str(i),i*10,self.ts+10000*i,i*2,i+10.00))
tdSql.query('select elapsed(ts,10s) from sub_1 where ts>="2015-01-01 00:00:00.000" and ts < "2015-01-01 00:10:00.000" session(ts,1d) ;')
cfg_path = self.getcfgPath()
print(cfg_path)
# tdSql.execute('select elapsed(ts,10s) from st where ts>="2015-01-01 00:00:00.000" and ts < "2015-01-01 00:10:00.000" session(ts,1d) group by tbname;') # session not support super table
os.system("taos -c %s -s 'select elapsed(ts,10s) from testdb.st where ts>=\"2015-01-01 00:00:00.000\" and ts < \"2015-01-01 00:10:00.000\" session(ts,1d) group by tbname;' " % (cfg_path))
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册