未验证 提交 d03b5c43 编写于 作者: Z Zhiyu Yang 提交者: GitHub

[TS-531]<feature>: jdbc parameter bind multi tables (#8537)

上级 608c3af3
...@@ -36,15 +36,15 @@ import java.util.regex.Pattern; ...@@ -36,15 +36,15 @@ import java.util.regex.Pattern;
* compatibility needs. * compatibility needs.
*/ */
public class TSDBPreparedStatement extends TSDBStatement implements PreparedStatement { public class TSDBPreparedStatement extends TSDBStatement implements PreparedStatement {
// for jdbc preparedStatement interface
private String rawSql; private String rawSql;
private Object[] parameters; private Object[] parameters;
// for parameter binding
private ArrayList<ColumnInfo> colData; private long nativeStmtHandle = 0;
private String tableName;
private ArrayList<TableTagInfo> tableTags; private ArrayList<TableTagInfo> tableTags;
private int tagValueLength; private int tagValueLength;
private ArrayList<ColumnInfo> colData;
private String tableName;
private long nativeStmtHandle = 0;
TSDBPreparedStatement(TSDBConnection connection, String sql) { TSDBPreparedStatement(TSDBConnection connection, String sql) {
super(connection); super(connection);
...@@ -72,10 +72,6 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat ...@@ -72,10 +72,6 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
preprocessSql(); preprocessSql();
} }
/*
*
*/
/** /**
* Some of the SQLs sent by other popular frameworks or tools like Spark, contains syntax that cannot be parsed by * Some of the SQLs sent by other popular frameworks or tools like Spark, contains syntax that cannot be parsed by
* the TDengine client. Thus, some simple parsers/filters are intentionally added in this JDBC implementation in * the TDengine client. Thus, some simple parsers/filters are intentionally added in this JDBC implementation in
...@@ -250,13 +246,10 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat ...@@ -250,13 +246,10 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
@Override @Override
public void setObject(int parameterIndex, Object x) throws SQLException { public void setObject(int parameterIndex, Object x) throws SQLException {
if (isClosed()) { if (isClosed())
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED);
} if (parameterIndex < 1 && parameterIndex >= parameters.length)
if (parameterIndex < 1 && parameterIndex >= parameters.length) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_PARAMETER_INDEX_OUT_RANGE); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_PARAMETER_INDEX_OUT_RANGE);
}
parameters[parameterIndex - 1] = x; parameters[parameterIndex - 1] = x;
} }
...@@ -335,7 +328,6 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat ...@@ -335,7 +328,6 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
public void setTimestamp(int parameterIndex, Timestamp x, Calendar cal) throws SQLException { public void setTimestamp(int parameterIndex, Timestamp x, Calendar cal) throws SQLException {
if (isClosed()) if (isClosed())
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED);
// TODO:
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNSUPPORTED_METHOD); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNSUPPORTED_METHOD);
} }
...@@ -419,7 +411,6 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat ...@@ -419,7 +411,6 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
public void setObject(int parameterIndex, Object x, int targetSqlType, int scaleOrLength) throws SQLException { public void setObject(int parameterIndex, Object x, int targetSqlType, int scaleOrLength) throws SQLException {
if (isClosed()) if (isClosed())
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED);
//TODO:
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNSUPPORTED_METHOD); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNSUPPORTED_METHOD);
} }
...@@ -477,7 +468,6 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat ...@@ -477,7 +468,6 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
if (isClosed()) if (isClosed())
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED);
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNSUPPORTED_METHOD); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNSUPPORTED_METHOD);
} }
@Override @Override
...@@ -496,7 +486,7 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat ...@@ -496,7 +486,7 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
/////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////
// NOTE: the following APIs are not JDBC compatible // NOTE: the following APIs are not JDBC compatible
// set the bind table name // parameter binding
private static class ColumnInfo { private static class ColumnInfo {
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
private ArrayList data; private ArrayList data;
...@@ -539,7 +529,11 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat ...@@ -539,7 +529,11 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
} }
} }
public void setTableName(String name) { public void setTableName(String name) throws SQLException {
if (this.tableName != null) {
this.columnDataExecuteBatch();
this.columnDataClearBatchInternal();
}
this.tableName = name; this.tableName = name;
} }
...@@ -960,17 +954,22 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat ...@@ -960,17 +954,22 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
public void columnDataExecuteBatch() throws SQLException { public void columnDataExecuteBatch() throws SQLException {
TSDBJNIConnector connector = ((TSDBConnection) this.getConnection()).getConnector(); TSDBJNIConnector connector = ((TSDBConnection) this.getConnection()).getConnector();
connector.executeBatch(this.nativeStmtHandle); connector.executeBatch(this.nativeStmtHandle);
this.columnDataClearBatch(); this.columnDataClearBatchInternal();
} }
@Deprecated
public void columnDataClearBatch() { public void columnDataClearBatch() {
columnDataClearBatchInternal();
}
private void columnDataClearBatchInternal() {
int size = this.colData.size(); int size = this.colData.size();
this.colData.clear(); this.colData.clear();
this.colData.addAll(Collections.nCopies(size, null)); this.colData.addAll(Collections.nCopies(size, null));
this.tableName = null; // clear the table name this.tableName = null; // clear the table name
} }
public void columnDataCloseBatch() throws SQLException { public void columnDataCloseBatch() throws SQLException {
TSDBJNIConnector connector = ((TSDBConnection) this.getConnection()).getConnector(); TSDBJNIConnector connector = ((TSDBConnection) this.getConnection()).getConnector();
connector.closeBatch(this.nativeStmtHandle); connector.closeBatch(this.nativeStmtHandle);
...@@ -978,4 +977,11 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat ...@@ -978,4 +977,11 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
this.nativeStmtHandle = 0L; this.nativeStmtHandle = 0L;
this.tableName = null; this.tableName = null;
} }
@Override
public void close() throws SQLException {
this.columnDataClearBatchInternal();
this.columnDataCloseBatch();
super.close();
}
} }
...@@ -5,9 +5,7 @@ import com.taosdata.jdbc.TSDBErrorNumbers; ...@@ -5,9 +5,7 @@ import com.taosdata.jdbc.TSDBErrorNumbers;
import org.apache.http.HeaderElement; import org.apache.http.HeaderElement;
import org.apache.http.HeaderElementIterator; import org.apache.http.HeaderElementIterator;
import org.apache.http.HttpEntity; import org.apache.http.HttpEntity;
import org.apache.http.NoHttpResponseException;
import org.apache.http.client.ClientProtocolException; import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.HttpRequestRetryHandler;
import org.apache.http.client.config.RequestConfig; import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.*; import org.apache.http.client.methods.*;
import org.apache.http.client.protocol.HttpClientContext; import org.apache.http.client.protocol.HttpClientContext;
...@@ -21,10 +19,7 @@ import org.apache.http.protocol.HTTP; ...@@ -21,10 +19,7 @@ import org.apache.http.protocol.HTTP;
import org.apache.http.protocol.HttpContext; import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils; import org.apache.http.util.EntityUtils;
import javax.net.ssl.SSLException;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.sql.SQLException; import java.sql.SQLException;
...@@ -53,10 +48,9 @@ public class HttpClientPoolUtil { ...@@ -53,10 +48,9 @@ public class HttpClientPoolUtil {
return DEFAULT_HTTP_KEEP_TIME * 1000; return DEFAULT_HTTP_KEEP_TIME * 1000;
}; };
private static CloseableHttpClient httpClient; private static final CloseableHttpClient httpClient;
static { static {
PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager(); PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager();
connectionManager.setMaxTotal(DEFAULT_MAX_TOTAL); connectionManager.setMaxTotal(DEFAULT_MAX_TOTAL);
connectionManager.setDefaultMaxPerRoute(DEFAULT_MAX_PER_ROUTE); connectionManager.setDefaultMaxPerRoute(DEFAULT_MAX_PER_ROUTE);
......
package com.taosdata.jdbc;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.sql.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;
public class ParameterBindTest {
private static final String host = "127.0.0.1";
private static final String stable = "weather";
private Connection conn;
private final Random random = new Random(System.currentTimeMillis());
@Test
public void test() {
// given
String[] tbnames = {"t1", "t2", "t3"};
int rows = 10;
// when
insertIntoTables(tbnames, 10);
// then
assertRows(stable, tbnames.length * rows);
for (String t : tbnames) {
assertRows(t, rows);
}
}
@Test
public void testMultiThreads() {
// given
String[][] tables = {{"t1", "t2", "t3"}, {"t4", "t5", "t6"}, {"t7", "t8", "t9"}, {"t10"}};
int rows = 10;
// when
List<Thread> threads = Arrays.stream(tables).map(tbnames -> new Thread(() -> insertIntoTables(tbnames, rows))).collect(Collectors.toList());
threads.forEach(Thread::start);
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// then
for (String[] table : tables) {
for (String t : table) {
assertRows(t, rows);
}
}
}
private void assertRows(String tbname, int rows) {
try (Statement stmt = conn.createStatement()) {
ResultSet rs = stmt.executeQuery("select count(*) from " + tbname);
while (rs.next()) {
int count = rs.getInt(1);
Assert.assertEquals(rows, count);
}
} catch (SQLException e) {
e.printStackTrace();
}
}
private void insertIntoTables(String[] tbnames, int rowsEachTable) {
long current = System.currentTimeMillis();
String sql = "insert into ? using " + stable + " tags(?, ?) values(?, ?, ?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
for (int i = 0; i < tbnames.length; i++) {
pstmt.setTableName(tbnames[i]);
pstmt.setTagInt(0, random.nextInt(100));
pstmt.setTagInt(1, random.nextInt(100));
ArrayList<Long> timestampList = new ArrayList<>();
for (int j = 0; j < rowsEachTable; j++) {
timestampList.add(current + i * 1000 + j);
}
pstmt.setTimestamp(0, timestampList);
ArrayList<Integer> f1List = new ArrayList<>();
for (int j = 0; j < rowsEachTable; j++) {
f1List.add(random.nextInt(100));
}
pstmt.setInt(1, f1List);
ArrayList<Integer> f2List = new ArrayList<>();
for (int j = 0; j < rowsEachTable; j++) {
f2List.add(random.nextInt(100));
}
pstmt.setInt(2, f2List);
pstmt.columnDataAddBatch();
}
pstmt.columnDataExecuteBatch();
} catch (SQLException e) {
e.printStackTrace();
}
}
@Before
public void before() {
String url = "jdbc:TAOS://" + host + ":6030/?user=root&password=taosdata";
try {
conn = DriverManager.getConnection(url);
Statement stmt = conn.createStatement();
stmt.execute("drop database if exists test_pd");
stmt.execute("create database if not exists test_pd");
stmt.execute("use test_pd");
stmt.execute("create table " + stable + "(ts timestamp, f1 int, f2 int) tags(t1 int, t2 int)");
} catch (SQLException e) {
e.printStackTrace();
}
}
@After
public void after() {
try {
// Statement stmt = conn.createStatement();
// stmt.execute("drop database if exists test_pd");
if (conn != null)
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
...@@ -2,7 +2,6 @@ package com.taosdata.jdbc.utils; ...@@ -2,7 +2,6 @@ package com.taosdata.jdbc.utils;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.taosdata.jdbc.TSDBDriver;
import com.taosdata.jdbc.TSDBError; import com.taosdata.jdbc.TSDBError;
import org.junit.Test; import org.junit.Test;
...@@ -11,7 +10,6 @@ import java.net.URLEncoder; ...@@ -11,7 +10,6 @@ import java.net.URLEncoder;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;
...@@ -27,11 +25,6 @@ public class HttpClientPoolUtilTest { ...@@ -27,11 +25,6 @@ public class HttpClientPoolUtilTest {
// given // given
List<Thread> threads = IntStream.range(0, 4000).mapToObj(i -> new Thread(() -> { List<Thread> threads = IntStream.range(0, 4000).mapToObj(i -> new Thread(() -> {
useDB(); useDB();
// try {
// TimeUnit.SECONDS.sleep(10);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
})).collect(Collectors.toList()); })).collect(Collectors.toList());
threads.forEach(Thread::start); threads.forEach(Thread::start);
...@@ -43,7 +36,6 @@ public class HttpClientPoolUtilTest { ...@@ -43,7 +36,6 @@ public class HttpClientPoolUtilTest {
e.printStackTrace(); e.printStackTrace();
} }
} }
} }
private void useDB() { private void useDB() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册