未验证 提交 137ea2d0 编写于 作者: H huolibo 提交者: GitHub

[TD-11946]<feature>: parse block data and handle null value (#10353)

上级 c4aa0be0
......@@ -99,7 +99,6 @@ public class RestfulDriver extends AbstractDriver {
} catch (InterruptedException e) {
throw new SQLException("creat websocket connection has been Interrupted ", e);
}
// TODO fetch Type from config
props.setProperty(TSDBDriver.PROPERTY_KEY_TIMESTAMP_FORMAT, String.valueOf(TimestampFormat.TIMESTAMP));
return new WSConnection(url, props, transport, database);
}
......
......@@ -11,81 +11,97 @@ public class NullType {
public static boolean isBooleanNull(byte val) {
return val == NullType.NULL_BOOL_VAL;
}
public static boolean isTinyIntNull(byte val) {
return val == Byte.MIN_VALUE;
}
public static boolean isUnsignedTinyIntNull(byte val) {
return val == (byte) 0xFF;
}
public static boolean isSmallIntNull(short val) {
return val == Short.MIN_VALUE;
}
public static boolean isUnsignedSmallIntNull(short val) {
return val == (short) 0xFFFF;
}
public static boolean isIntNull(int val) {
return val == Integer.MIN_VALUE;
}
public static boolean isUnsignedIntNull(int val) {
return val == 0xFFFFFFFF;
}
public static boolean isBigIntNull(long val) {
return val == Long.MIN_VALUE;
}
public static boolean isUnsignedBigIntNull(long val) {
return val == 0xFFFFFFFFFFFFFFFFL;
}
public static boolean isFloatNull(float val) {
return Float.isNaN(val);
}
public static boolean isDoubleNull(double val) {
return Double.isNaN(val);
}
public static boolean isBinaryNull(byte[] val, int length) {
if (length != Byte.BYTES) {
return false;
}
return val[0] == 0xFF;
return val[0] == (byte) 0xFF;
}
public static boolean isNcharNull(byte[] val, int length) {
if (length != Integer.BYTES) {
return false;
}
return (val[0] & val[1] & val[2] & val[3]) == 0xFF;
return (val[0] & val[1] & val[2] & val[3] & 0xFF) == 0xFF;
}
public static byte getBooleanNull() {
return NullType.NULL_BOOL_VAL;
return NullType.NULL_BOOL_VAL;
}
public static byte getTinyintNull() {
return Byte.MIN_VALUE;
return Byte.MIN_VALUE;
}
public static int getIntNull() {
return Integer.MIN_VALUE;
return Integer.MIN_VALUE;
}
public static short getSmallIntNull() {
return Short.MIN_VALUE;
return Short.MIN_VALUE;
}
public static long getBigIntNull() {
return Long.MIN_VALUE;
return Long.MIN_VALUE;
}
public static int getFloatNull() {
return 0x7FF00000;
return 0x7FF00000;
}
public static long getDoubleNull() {
return 0x7FFFFF0000000000L;
return 0x7FFFFF0000000000L;
}
public static byte getBinaryNull() {
return (byte) 0xFF;
return (byte) 0xFF;
}
public static byte[] getNcharNull() {
return new byte[] {(byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF};
return new byte[]{(byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF};
}
}
......@@ -20,26 +20,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public abstract class AbstractWSResultSet extends AbstractResultSet {
public static DateTimeFormatter rfc3339Parser = new DateTimeFormatterBuilder()
.parseCaseInsensitive()
.appendValue(ChronoField.YEAR, 4)
.appendLiteral('-')
.appendValue(ChronoField.MONTH_OF_YEAR, 2)
.appendLiteral('-')
.appendValue(ChronoField.DAY_OF_MONTH, 2)
.appendLiteral('T')
.appendValue(ChronoField.HOUR_OF_DAY, 2)
.appendLiteral(':')
.appendValue(ChronoField.MINUTE_OF_HOUR, 2)
.appendLiteral(':')
.appendValue(ChronoField.SECOND_OF_MINUTE, 2)
.optionalStart()
.appendFraction(ChronoField.NANO_OF_SECOND, 2, 9, true)
.optionalEnd()
.appendOffset("+HH:MM", "Z").toFormatter()
.withResolverStyle(ResolverStyle.STRICT)
.withChronology(IsoChronology.INSTANCE);
protected final Statement statement;
protected final Transport transport;
protected final RequestFactory factory;
......@@ -108,7 +88,7 @@ public abstract class AbstractWSResultSet extends AbstractResultSet {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, fetchResp.getMessage());
}
this.reset();
if (fetchResp.isCompleted()) {
if (fetchResp.isCompleted() || fetchResp.getRows() == 0) {
this.isCompleted = true;
return false;
}
......
......@@ -39,9 +39,9 @@ public class WSConnection extends AbstractConnection {
public PreparedStatement prepareStatement(String sql) throws SQLException {
if (isClosed())
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_CONNECTION_CLOSED);
// return new WSPreparedStatement();
return null;
//TODO
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNSUPPORTED_METHOD);
// return new WSPreparedStatement(transport, database, this, factory, sql);
}
@Override
......
package com.taosdata.jdbc.ws;
import com.taosdata.jdbc.TSDBError;
import com.taosdata.jdbc.TSDBErrorNumbers;
import com.taosdata.jdbc.utils.Utils;
import com.taosdata.jdbc.ws.entity.RequestFactory;
import java.io.InputStream;
import java.io.Reader;
import java.math.BigDecimal;
import java.net.URL;
import java.sql.*;
import java.util.ArrayList;
import java.util.Calendar;
public class WSPreparedStatement extends WSStatement implements PreparedStatement {
private final String rawSql;
private Object[] parameters;
private ArrayList<TableTagInfo> tableTags;
private ArrayList<ColumnInfo> colData;
public WSPreparedStatement(Transport transport, String database, Connection connection, RequestFactory factory, String rawSql) {
super(transport, database, connection, factory);
this.rawSql = rawSql;
int parameterCnt = 0;
if (rawSql.contains("?")) {
for (int i = 0; i < rawSql.length(); i++) {
if ('?' == rawSql.charAt(i)) {
parameterCnt++;
}
}
this.parameters = new Object[parameterCnt];
this.colData = new ArrayList<>();
this.tableTags = new ArrayList<>();
}
}
@Override
public ResultSet executeQuery() throws SQLException {
final String sql = Utils.getNativeSql(this.rawSql, this.parameters);
return executeQuery(sql);
}
@Override
public int executeUpdate() throws SQLException {
return 0;
}
@Override
public void setNull(int parameterIndex, int sqlType) throws SQLException {
}
@Override
public void setBoolean(int parameterIndex, boolean x) throws SQLException {
}
@Override
public void setByte(int parameterIndex, byte x) throws SQLException {
}
@Override
public void setShort(int parameterIndex, short x) throws SQLException {
}
@Override
public void setInt(int parameterIndex, int x) throws SQLException {
}
@Override
public void setLong(int parameterIndex, long x) throws SQLException {
}
@Override
public void setFloat(int parameterIndex, float x) throws SQLException {
}
@Override
public void setDouble(int parameterIndex, double x) throws SQLException {
}
@Override
public void setBigDecimal(int parameterIndex, BigDecimal x) throws SQLException {
}
@Override
public void setString(int parameterIndex, String x) throws SQLException {
}
@Override
public void setBytes(int parameterIndex, byte[] x) throws SQLException {
}
@Override
public void setDate(int parameterIndex, Date x) throws SQLException {
}
@Override
public void setTime(int parameterIndex, Time x) throws SQLException {
}
@Override
public void setTimestamp(int parameterIndex, Timestamp x) throws SQLException {
}
@Override
public void setAsciiStream(int parameterIndex, InputStream x, int length) throws SQLException {
}
@Override
public void setUnicodeStream(int parameterIndex, InputStream x, int length) throws SQLException {
}
@Override
public void setBinaryStream(int parameterIndex, InputStream x, int length) throws SQLException {
}
@Override
public void clearParameters() throws SQLException {
}
@Override
public void setObject(int parameterIndex, Object x, int targetSqlType) throws SQLException {
}
@Override
public void setObject(int parameterIndex, Object x) throws SQLException {
}
@Override
public boolean execute() throws SQLException {
return false;
}
@Override
public void addBatch() throws SQLException {
}
@Override
public void setCharacterStream(int parameterIndex, Reader reader, int length) throws SQLException {
}
@Override
public void setRef(int parameterIndex, Ref x) throws SQLException {
}
@Override
public void setBlob(int parameterIndex, Blob x) throws SQLException {
}
@Override
public void setClob(int parameterIndex, Clob x) throws SQLException {
}
@Override
public void setArray(int parameterIndex, Array x) throws SQLException {
}
@Override
public ResultSetMetaData getMetaData() throws SQLException {
return null;
}
@Override
public void setDate(int parameterIndex, Date x, Calendar cal) throws SQLException {
}
@Override
public void setTime(int parameterIndex, Time x, Calendar cal) throws SQLException {
}
@Override
public void setTimestamp(int parameterIndex, Timestamp x, Calendar cal) throws SQLException {
}
@Override
public void setNull(int parameterIndex, int sqlType, String typeName) throws SQLException {
}
@Override
public void setURL(int parameterIndex, URL x) throws SQLException {
}
@Override
public ParameterMetaData getParameterMetaData() throws SQLException {
return null;
}
@Override
public void setRowId(int parameterIndex, RowId x) throws SQLException {
}
@Override
public void setNString(int parameterIndex, String value) throws SQLException {
}
@Override
public void setNCharacterStream(int parameterIndex, Reader value, long length) throws SQLException {
}
@Override
public void setNClob(int parameterIndex, NClob value) throws SQLException {
}
@Override
public void setClob(int parameterIndex, Reader reader, long length) throws SQLException {
}
@Override
public void setBlob(int parameterIndex, InputStream inputStream, long length) throws SQLException {
}
@Override
public void setNClob(int parameterIndex, Reader reader, long length) throws SQLException {
}
@Override
public void setSQLXML(int parameterIndex, SQLXML xmlObject) throws SQLException {
}
@Override
public void setObject(int parameterIndex, Object x, int targetSqlType, int scaleOrLength) throws SQLException {
}
@Override
public void setAsciiStream(int parameterIndex, InputStream x, long length) throws SQLException {
}
@Override
public void setBinaryStream(int parameterIndex, InputStream x, long length) throws SQLException {
}
@Override
public void setCharacterStream(int parameterIndex, Reader reader, long length) throws SQLException {
}
@Override
public void setAsciiStream(int parameterIndex, InputStream x) throws SQLException {
}
@Override
public void setBinaryStream(int parameterIndex, InputStream x) throws SQLException {
}
@Override
public void setCharacterStream(int parameterIndex, Reader reader) throws SQLException {
}
@Override
public void setNCharacterStream(int parameterIndex, Reader value) throws SQLException {
}
@Override
public void setClob(int parameterIndex, Reader reader) throws SQLException {
}
@Override
public void setBlob(int parameterIndex, InputStream inputStream) throws SQLException {
}
@Override
public void setNClob(int parameterIndex, Reader reader) throws SQLException {
}
@Override
public ResultSet executeQuery(String sql) throws SQLException {
return null;
}
@Override
public int executeUpdate(String sql) throws SQLException {
return 0;
}
@Override
public void close() throws SQLException {
}
@Override
public int getMaxFieldSize() throws SQLException {
return 0;
}
@Override
public void setMaxFieldSize(int max) throws SQLException {
}
@Override
public int getMaxRows() throws SQLException {
return 0;
}
@Override
public void setMaxRows(int max) throws SQLException {
}
@Override
public void setEscapeProcessing(boolean enable) throws SQLException {
}
@Override
public int getQueryTimeout() throws SQLException {
return 0;
}
@Override
public void setQueryTimeout(int seconds) throws SQLException {
}
@Override
public void cancel() throws SQLException {
}
@Override
public SQLWarning getWarnings() throws SQLException {
return null;
}
@Override
public void clearWarnings() throws SQLException {
}
@Override
public void setCursorName(String name) throws SQLException {
}
@Override
public boolean execute(String sql) throws SQLException {
return false;
}
@Override
public ResultSet getResultSet() throws SQLException {
return null;
}
@Override
public int getUpdateCount() throws SQLException {
return 0;
}
@Override
public boolean getMoreResults() throws SQLException {
return false;
}
@Override
public void setFetchDirection(int direction) throws SQLException {
}
@Override
public int getFetchDirection() throws SQLException {
return 0;
}
@Override
public void setFetchSize(int rows) throws SQLException {
}
@Override
public int getFetchSize() throws SQLException {
return 0;
}
@Override
public int getResultSetConcurrency() throws SQLException {
return 0;
}
@Override
public int getResultSetType() throws SQLException {
return 0;
}
@Override
public void addBatch(String sql) throws SQLException {
}
@Override
public void clearBatch() throws SQLException {
}
@Override
public int[] executeBatch() throws SQLException {
return new int[0];
}
@Override
public Connection getConnection() throws SQLException {
return null;
}
@Override
public boolean getMoreResults(int current) throws SQLException {
return false;
}
@Override
public ResultSet getGeneratedKeys() throws SQLException {
return null;
}
@Override
public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
return 0;
}
@Override
public int executeUpdate(String sql, int[] columnIndexes) throws SQLException {
return 0;
}
@Override
public int executeUpdate(String sql, String[] columnNames) throws SQLException {
return 0;
}
@Override
public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
return false;
}
@Override
public boolean execute(String sql, int[] columnIndexes) throws SQLException {
return false;
}
@Override
public boolean execute(String sql, String[] columnNames) throws SQLException {
return false;
}
@Override
public int getResultSetHoldability() throws SQLException {
return 0;
}
@Override
public boolean isClosed() throws SQLException {
return false;
}
@Override
public void setPoolable(boolean poolable) throws SQLException {
}
@Override
public boolean isPoolable() throws SQLException {
return false;
}
@Override
public void closeOnCompletion() throws SQLException {
}
@Override
public boolean isCloseOnCompletion() throws SQLException {
return false;
}
@Override
public <T> T unwrap(Class<T> iface) throws SQLException {
return null;
}
@Override
public boolean isWrapperFor(Class<?> iface) throws SQLException {
return false;
}
private static class ColumnInfo {
@SuppressWarnings("rawtypes")
private ArrayList data;
private int type;
private int bytes;
private boolean typeIsSet;
public ColumnInfo() {
this.typeIsSet = false;
}
public void setType(int type) throws SQLException {
if (this.isTypeSet()) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "column data type has been set");
}
this.typeIsSet = true;
this.type = type;
}
public boolean isTypeSet() {
return this.typeIsSet;
}
}
private static class TableTagInfo {
private boolean isNull;
private final Object value;
private final int type;
public TableTagInfo(Object value, int type) {
this.value = value;
this.type = type;
}
public static TableTagInfo createNullTag(int type) {
TableTagInfo info = new TableTagInfo(null, type);
info.isNull = true;
return info;
}
}
}
package com.taosdata.jdbc.ws;
import com.taosdata.jdbc.AbstractStatement;
import com.taosdata.jdbc.TSDBDriver;
import com.taosdata.jdbc.TSDBError;
import com.taosdata.jdbc.TSDBErrorNumbers;
import com.taosdata.jdbc.utils.SqlSyntaxValidator;
......@@ -14,7 +15,7 @@ import java.util.concurrent.ExecutionException;
public class WSStatement extends AbstractStatement {
private final Transport transport;
private final String database;
private String database;
private final Connection connection;
private final RequestFactory factory;
......@@ -71,6 +72,11 @@ public class WSStatement extends AbstractStatement {
if (Code.SUCCESS.getCode() != queryResp.getCode()) {
throw TSDBError.createSQLException(queryResp.getCode(), queryResp.getMessage());
}
if (SqlSyntaxValidator.isUseSql(sql)) {
this.database = sql.trim().replace("use", "").trim();
this.connection.setCatalog(this.database);
this.connection.setClientInfo(TSDBDriver.PROPERTY_KEY_DBNAME, this.database);
}
if (queryResp.isUpdate()) {
this.resultSet = null;
this.affectedRows = queryResp.getAffectedRows();
......
......@@ -19,7 +19,7 @@ import java.util.concurrent.TimeUnit;
@RunWith(CatalogRunner.class)
@TestTarget(alias = "test connection with server", author = "huolibo", version = "2.0.37")
public class WSConnectionTest {
// private static final String host = "192.168.1.98";
// private static final String host = "192.168.1.98";
private static final String host = "127.0.0.1";
private static final int port = 6041;
private Connection connection;
......@@ -27,7 +27,7 @@ public class WSConnectionTest {
@Test
@Description("normal test with websocket server")
public void normalConection() throws SQLException {
String url = "jdbc:TAOS-RS://" + host + ":" + port + "/test?user=root&password=taosdata";
String url = "jdbc:TAOS-RS://" + host + ":" + port + "/log?user=root&password=taosdata";
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "true");
connection = DriverManager.getConnection(url, properties);
......@@ -56,7 +56,7 @@ public class WSConnectionTest {
@Test(expected = SQLException.class)
@Description("wrong password or user")
public void wrongUserOrPasswordConection() throws SQLException {
String url = "jdbc:TAOS-RS://" + host + ":" + port + "/test?user=abc&password=taosdata";
String url = "jdbc:TAOS-RS://" + host + ":" + port + "/log?user=abc&password=taosdata";
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "true");
connection = DriverManager.getConnection(url, properties);
......@@ -69,13 +69,13 @@ public class WSConnectionTest {
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "true");
connection = DriverManager.getConnection(url, properties);
TimeUnit.MINUTES.sleep(1);
TimeUnit.SECONDS.sleep(20);
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery("show databases");
TimeUnit.MINUTES.sleep(1);
TimeUnit.SECONDS.sleep(20);
resultSet.next();
System.out.println(resultSet.getTimestamp(1));
resultSet.close();
statement.close();
connection.close();
}
}
......@@ -11,12 +11,6 @@ import org.junit.runners.MethodSorters;
import java.sql.*;
import java.util.Properties;
/**
* Most of the functionality is consistent with {@link com.taosdata.jdbc.JsonTagTest},
* Except for batchInsert, which is not supported by restful API.
* Restful could not distinguish between empty and nonexistent of json value, the result is always null.
* The order of json results may change due to serialization and deserialization
*/
@Ignore
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
@RunWith(CatalogRunner.class)
......
......@@ -9,7 +9,9 @@ import org.junit.runner.RunWith;
import java.sql.*;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
@Ignore
......@@ -17,7 +19,8 @@ import java.util.stream.IntStream;
@TestTarget(alias = "query test", author = "huolibo", version = "2.0.38")
@FixMethodOrder
public class WSQueryTest {
private static final String host = "192.168.1.98";
// private static final String host = "192.168.1.98";
private static final String host = "127.0.0.1";
private static final int port = 6041;
private static final String databaseName = "ws_query";
private static final String tableName = "wq";
......@@ -26,8 +29,9 @@ public class WSQueryTest {
@Description("query")
@Test
public void queryBlock() throws SQLException, InterruptedException {
IntStream.range(1, 100).limit(1000).parallel().forEach(x -> {
public void queryBlock() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1000);
IntStream.range(1, 10000).limit(1000).parallel().forEach(x -> {
try {
Statement statement = connection.createStatement();
......@@ -37,18 +41,17 @@ public class WSQueryTest {
resultSet.next();
Assert.assertEquals(100, resultSet.getInt(2));
statement.close();
TimeUnit.SECONDS.sleep(10);
latch.countDown();
} catch (SQLException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
latch.await();
}
@Before
public void before() throws SQLException {
String url = "jdbc:TAOS-RS://" + host + ":" + port + "/test?user=root&password=taosdata";
String url = "jdbc:TAOS-RS://" + host + ":" + port + "/log?user=root&password=taosdata";
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "true");
connection = DriverManager.getConnection(url, properties);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册