未验证 提交 992ec97e 编写于 作者: H huolibo 提交者: GitHub

[TD-11946]<feat>: implement jdbc with websocket and get/parse JSON resultSet (#10221)

* [TD-11946]<feat>: implement the JDBC framwork

* [TD-11946]<feat>: Get the resultset and parse the resultset

add fetch file

* remove useless file

* [TD-11946]<feat>: add close method release resource

* [TD-11946]<feat>: remove redundant code

* [TD-11946]<feat>: complete websocket parse block data, remove json format

* [TD-11946]<feat>: ignore websocket test
上级 200a9291
...@@ -3,6 +3,7 @@ package com.taosdata.jdbc.rs; ...@@ -3,6 +3,7 @@ package com.taosdata.jdbc.rs;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.taosdata.jdbc.*; import com.taosdata.jdbc.*;
import com.taosdata.jdbc.enums.TimestampFormat;
import com.taosdata.jdbc.utils.HttpClientPoolUtil; import com.taosdata.jdbc.utils.HttpClientPoolUtil;
import com.taosdata.jdbc.ws.InFlightRequest; import com.taosdata.jdbc.ws.InFlightRequest;
import com.taosdata.jdbc.ws.Transport; import com.taosdata.jdbc.ws.Transport;
...@@ -77,18 +78,20 @@ public class RestfulDriver extends AbstractDriver { ...@@ -77,18 +78,20 @@ public class RestfulDriver extends AbstractDriver {
int maxRequest = props.containsKey(TSDBDriver.PROPERTY_KEY_MAX_CONCURRENT_REQUEST) int maxRequest = props.containsKey(TSDBDriver.PROPERTY_KEY_MAX_CONCURRENT_REQUEST)
? Integer.parseInt(props.getProperty(TSDBDriver.PROPERTY_KEY_MAX_CONCURRENT_REQUEST)) ? Integer.parseInt(props.getProperty(TSDBDriver.PROPERTY_KEY_MAX_CONCURRENT_REQUEST))
: Transport.DEFAULT_MAX_REQUEST; : Transport.DEFAULT_MAX_REQUEST;
InFlightRequest inFlightRequest = new InFlightRequest(timeout, maxRequest); InFlightRequest inFlightRequest = new InFlightRequest(timeout, maxRequest);
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
Map<String, String> httpHeaders = new HashMap<>(); Map<String, String> httpHeaders = new HashMap<>();
client = new WSClient(new URI(loginUrl), user, password, database, inFlightRequest, httpHeaders, latch, maxRequest); client = new WSClient(new URI(loginUrl), user, password, database,
inFlightRequest, httpHeaders, latch, maxRequest);
transport = new Transport(client, inFlightRequest); transport = new Transport(client, inFlightRequest);
if (!client.connectBlocking()) { if (!client.connectBlocking(timeout, TimeUnit.MILLISECONDS)) {
throw new SQLException("can't create connection with server"); throw new SQLException("can't create connection with server");
} }
if (!latch.await(timeout, TimeUnit.MILLISECONDS)) { if (!latch.await(timeout, TimeUnit.MILLISECONDS)) {
throw new SQLException("auth timeout"); throw new SQLException("auth timeout");
} }
if (client.isAuth()) { if (!client.isAuth()) {
throw new SQLException("auth failure"); throw new SQLException("auth failure");
} }
} catch (URISyntaxException e) { } catch (URISyntaxException e) {
...@@ -96,7 +99,9 @@ public class RestfulDriver extends AbstractDriver { ...@@ -96,7 +99,9 @@ public class RestfulDriver extends AbstractDriver {
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new SQLException("creat websocket connection has been Interrupted ", e); throw new SQLException("creat websocket connection has been Interrupted ", e);
} }
return new WSConnection(url, props, transport, database, true); // TODO fetch Type from config
props.setProperty(TSDBDriver.PROPERTY_KEY_TIMESTAMP_FORMAT, String.valueOf(TimestampFormat.TIMESTAMP));
return new WSConnection(url, props, transport, database);
} }
loginUrl = "http://" + props.getProperty(TSDBDriver.PROPERTY_KEY_HOST) + ":" + props.getProperty(TSDBDriver.PROPERTY_KEY_PORT) + "/rest/login/" + user + "/" + password + ""; loginUrl = "http://" + props.getProperty(TSDBDriver.PROPERTY_KEY_HOST) + ":" + props.getProperty(TSDBDriver.PROPERTY_KEY_PORT) + "/rest/login/" + user + "/" + password + "";
int poolSize = Integer.parseInt(props.getProperty("httpPoolSize", HttpClientPoolUtil.DEFAULT_MAX_PER_ROUTE)); int poolSize = Integer.parseInt(props.getProperty("httpPoolSize", HttpClientPoolUtil.DEFAULT_MAX_PER_ROUTE));
......
...@@ -302,6 +302,9 @@ public class RestfulResultSet extends AbstractResultSet implements ResultSet { ...@@ -302,6 +302,9 @@ public class RestfulResultSet extends AbstractResultSet implements ResultSet {
this.taos_type = taos_type; this.taos_type = taos_type;
} }
public int getTaosType() {
return taos_type;
}
} }
@Override @Override
......
package com.taosdata.jdbc.ws;
import com.taosdata.jdbc.*;
import com.taosdata.jdbc.rs.RestfulResultSet;
import com.taosdata.jdbc.rs.RestfulResultSetMetaData;
import com.taosdata.jdbc.ws.entity.*;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.chrono.IsoChronology;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.time.format.ResolverStyle;
import java.time.temporal.ChronoField;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public abstract class AbstractWSResultSet extends AbstractResultSet {
public static DateTimeFormatter rfc3339Parser = new DateTimeFormatterBuilder()
.parseCaseInsensitive()
.appendValue(ChronoField.YEAR, 4)
.appendLiteral('-')
.appendValue(ChronoField.MONTH_OF_YEAR, 2)
.appendLiteral('-')
.appendValue(ChronoField.DAY_OF_MONTH, 2)
.appendLiteral('T')
.appendValue(ChronoField.HOUR_OF_DAY, 2)
.appendLiteral(':')
.appendValue(ChronoField.MINUTE_OF_HOUR, 2)
.appendLiteral(':')
.appendValue(ChronoField.SECOND_OF_MINUTE, 2)
.optionalStart()
.appendFraction(ChronoField.NANO_OF_SECOND, 2, 9, true)
.optionalEnd()
.appendOffset("+HH:MM", "Z").toFormatter()
.withResolverStyle(ResolverStyle.STRICT)
.withChronology(IsoChronology.INSTANCE);
protected final Statement statement;
protected final Transport transport;
protected final RequestFactory factory;
protected final long queryId;
protected boolean isClosed;
// meta
protected final ResultSetMetaData metaData;
protected final List<RestfulResultSet.Field> fields = new ArrayList<>();
protected final List<String> columnNames;
protected List<Integer> fieldLength;
// data
protected List<List<Object>> result = new ArrayList<>();
protected int numOfRows = 0;
protected int rowIndex = 0;
private boolean isCompleted;
public AbstractWSResultSet(Statement statement, Transport transport, RequestFactory factory,
QueryResp response, String database) throws SQLException {
this.statement = statement;
this.transport = transport;
this.factory = factory;
this.queryId = response.getId();
columnNames = Arrays.asList(response.getFieldsNames());
for (int i = 0; i < response.getFieldsCount(); i++) {
String colName = response.getFieldsNames()[i];
int taosType = response.getFieldsTypes()[i];
int jdbcType = TSDBConstants.taosType2JdbcType(taosType);
int length = response.getFieldsLengths()[i];
fields.add(new RestfulResultSet.Field(colName, jdbcType, length, "", taosType));
}
this.metaData = new RestfulResultSetMetaData(database, fields, null);
this.timestampPrecision = response.getPrecision();
}
private boolean forward() {
if (this.rowIndex > this.numOfRows) {
return false;
}
return ((++this.rowIndex) < this.numOfRows);
}
public void reset() {
this.rowIndex = 0;
}
@Override
public boolean next() throws SQLException {
if (isClosed()) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_RESULTSET_CLOSED);
}
if (this.forward()) {
return true;
}
Request request = factory.generateFetch(queryId);
CompletableFuture<Response> send = transport.send(request);
try {
Response response = send.get();
FetchResp fetchResp = (FetchResp) response;
if (Code.SUCCESS.getCode() != fetchResp.getCode()) {
// TODO reWrite error type
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, fetchResp.getMessage());
}
this.reset();
if (fetchResp.isCompleted()) {
this.isCompleted = true;
return false;
}
fieldLength = Arrays.asList(fetchResp.getLengths());
this.numOfRows = fetchResp.getRows();
this.result = fetchJsonData();
return true;
} catch (InterruptedException | ExecutionException e) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_RESTFul_Client_IOException, e.getMessage());
}
}
public abstract List<List<Object>> fetchJsonData() throws SQLException, ExecutionException, InterruptedException;
@Override
public void close() throws SQLException {
this.isClosed = true;
if (result != null && !result.isEmpty() && !isCompleted) {
FetchReq fetchReq = new FetchReq(queryId, queryId);
transport.sendWithoutRep(new Request(Action.FREE_RESULT.getAction(), fetchReq));
}
}
@Override
public ResultSetMetaData getMetaData() throws SQLException {
if (isClosed())
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_RESULTSET_CLOSED);
return this.metaData;
}
@Override
public boolean isClosed() throws SQLException {
return isClosed;
}
}
package com.taosdata.jdbc.ws; package com.taosdata.jdbc.ws;
import com.taosdata.jdbc.ws.entity.Action;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.*; import java.util.concurrent.*;
/** /**
* Unfinished execution * Unfinished execution
*/ */
public class InFlightRequest implements AutoCloseable { public class InFlightRequest {
private final int timeoutSec; private final int timeoutSec;
private final Semaphore semaphore; private final Semaphore semaphore;
private final Map<String, ResponseFuture> futureMap = new ConcurrentHashMap<>(); private final Map<String, ConcurrentHashMap<Long, ResponseFuture>> futureMap = new HashMap<>();
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); private final Map<String, PriorityBlockingQueue<ResponseFuture>> expireMap = new HashMap<>();
private final ScheduledFuture<?> scheduledFuture; private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r);
t.setName("timer-" + t.getId());
return t;
});
public InFlightRequest(int timeoutSec, int concurrentNum) { public InFlightRequest(int timeoutSec, int concurrentNum) {
this.timeoutSec = timeoutSec; this.timeoutSec = timeoutSec;
this.semaphore = new Semaphore(concurrentNum); this.semaphore = new Semaphore(concurrentNum);
this.scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this::removeTimeoutFuture, timeoutSec, timeoutSec, TimeUnit.MILLISECONDS); scheduledExecutorService.scheduleWithFixedDelay(this::removeTimeoutFuture,
timeoutSec, timeoutSec, TimeUnit.MILLISECONDS);
Runtime.getRuntime().addShutdownHook(new Thread(scheduledExecutorService::shutdown));
for (Action value : Action.values()) {
String action = value.getAction();
if (Action.CONN.getAction().equals(action))
continue;
futureMap.put(action, new ConcurrentHashMap<>());
expireMap.put(action, new PriorityBlockingQueue<>());
}
} }
public void put(ResponseFuture responseFuture) throws InterruptedException, TimeoutException { public void put(ResponseFuture rf) throws InterruptedException, TimeoutException {
if (semaphore.tryAcquire(timeoutSec, TimeUnit.MILLISECONDS)) { if (semaphore.tryAcquire(timeoutSec, TimeUnit.MILLISECONDS)) {
futureMap.put(responseFuture.getId(), responseFuture); futureMap.get(rf.getAction()).put(rf.getId(), rf);
expireMap.get(rf.getAction()).put(rf);
} else { } else {
throw new TimeoutException(); throw new TimeoutException();
} }
} }
public ResponseFuture remove(String id) { public ResponseFuture remove(String action, Long id) {
ResponseFuture future = futureMap.remove(id); ResponseFuture future = futureMap.get(action).remove(id);
if (null != future) { if (null != future) {
expireMap.get(action).remove(future);
semaphore.release(); semaphore.release();
} }
return future; return future;
} }
private void removeTimeoutFuture() { private void removeTimeoutFuture() {
futureMap.entrySet().removeIf(entry -> { expireMap.forEach((k, v) -> {
if (System.nanoTime() - entry.getValue().getTimestamp() > timeoutSec * 1_000_000L) { while (true) {
ResponseFuture response = v.peek();
if (null == response || (System.nanoTime() - response.getTimestamp()) < timeoutSec * 1_000_000L)
break;
try { try {
entry.getValue().getFuture().completeExceptionally(new TimeoutException()); v.poll();
}finally { futureMap.get(k).remove(response.getId());
response.getFuture().completeExceptionally(new TimeoutException());
} finally {
semaphore.release(); semaphore.release();
} }
return true;
} else {
return false;
} }
}); });
} }
@Override
public void close() {
scheduledFuture.cancel(true);
scheduledExecutorService.shutdown();
}
} }
...@@ -4,18 +4,24 @@ import com.taosdata.jdbc.ws.entity.Response; ...@@ -4,18 +4,24 @@ import com.taosdata.jdbc.ws.entity.Response;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
public class ResponseFuture { public class ResponseFuture implements Comparable<ResponseFuture> {
private final String id; private final String action;
private final Long id;
private final CompletableFuture<Response> future; private final CompletableFuture<Response> future;
private final long timestamp; private final long timestamp;
public ResponseFuture(String id, CompletableFuture<Response> future) { public ResponseFuture(String action, Long id, CompletableFuture<Response> future) {
this.action = action;
this.id = id; this.id = id;
this.future = future; this.future = future;
timestamp = System.nanoTime(); timestamp = System.nanoTime();
} }
public String getId() { public String getAction() {
return action;
}
public Long getId() {
return id; return id;
} }
...@@ -26,4 +32,12 @@ public class ResponseFuture { ...@@ -26,4 +32,12 @@ public class ResponseFuture {
long getTimestamp() { long getTimestamp() {
return timestamp; return timestamp;
} }
@Override
public int compareTo(ResponseFuture rf) {
long r = this.timestamp - rf.timestamp;
if (r > 0) return 1;
if (r < 0) return -1;
return 0;
}
} }
...@@ -25,15 +25,19 @@ public class Transport implements AutoCloseable { ...@@ -25,15 +25,19 @@ public class Transport implements AutoCloseable {
public CompletableFuture<Response> send(Request request) { public CompletableFuture<Response> send(Request request) {
CompletableFuture<Response> completableFuture = new CompletableFuture<>(); CompletableFuture<Response> completableFuture = new CompletableFuture<>();
try { try {
inFlightRequest.put(new ResponseFuture(request.id(), completableFuture)); inFlightRequest.put(new ResponseFuture(request.getAction(), request.id(), completableFuture));
client.send(request.toString()); client.send(request.toString());
} catch (Throwable t) { } catch (Throwable t) {
inFlightRequest.remove(request.id()); inFlightRequest.remove(request.getAction(), request.id());
completableFuture.completeExceptionally(t); completableFuture.completeExceptionally(t);
} }
return completableFuture; return completableFuture;
} }
public void sendWithoutRep(Request request) {
client.send(request.toString());
}
public boolean isClosed() throws SQLException { public boolean isClosed() throws SQLException {
return client.isClosed(); return client.isClosed();
} }
......
...@@ -7,6 +7,8 @@ import org.java_websocket.handshake.ServerHandshake; ...@@ -7,6 +7,8 @@ import org.java_websocket.handshake.ServerHandshake;
import java.net.URI; import java.net.URI;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.util.Map; import java.util.Map;
import java.util.concurrent.*; import java.util.concurrent.*;
...@@ -20,6 +22,7 @@ public class WSClient extends WebSocketClient implements AutoCloseable { ...@@ -20,6 +22,7 @@ public class WSClient extends WebSocketClient implements AutoCloseable {
ThreadPoolExecutor executor; ThreadPoolExecutor executor;
private boolean auth; private boolean auth;
private int reqId;
public boolean isAuth() { public boolean isAuth() {
return auth; return auth;
...@@ -54,8 +57,8 @@ public class WSClient extends WebSocketClient implements AutoCloseable { ...@@ -54,8 +57,8 @@ public class WSClient extends WebSocketClient implements AutoCloseable {
@Override @Override
public void onOpen(ServerHandshake serverHandshake) { public void onOpen(ServerHandshake serverHandshake) {
// certification // certification
Request request = Request.generateConnect(user, password, database); ConnectReq connectReq = new ConnectReq(++reqId, user, password, database);
this.send(request.toString()); this.send(new Request(Action.CONN.getAction(), connectReq).toString());
} }
@Override @Override
...@@ -64,14 +67,15 @@ public class WSClient extends WebSocketClient implements AutoCloseable { ...@@ -64,14 +67,15 @@ public class WSClient extends WebSocketClient implements AutoCloseable {
executor.submit(() -> { executor.submit(() -> {
JSONObject jsonObject = JSONObject.parseObject(message); JSONObject jsonObject = JSONObject.parseObject(message);
if (Action.CONN.getAction().equals(jsonObject.getString("action"))) { if (Action.CONN.getAction().equals(jsonObject.getString("action"))) {
latch.countDown();
if (Code.SUCCESS.getCode() != jsonObject.getInteger("code")) { if (Code.SUCCESS.getCode() != jsonObject.getInteger("code")) {
auth = false;
this.close(); this.close();
} else {
auth = true;
} }
latch.countDown();
} else { } else {
Response response = parseMessage(jsonObject); Response response = parseMessage(jsonObject);
ResponseFuture remove = inFlightRequest.remove(response.id()); ResponseFuture remove = inFlightRequest.remove(response.getAction(), response.getReqId());
if (null != remove) { if (null != remove) {
remove.getFuture().complete(response); remove.getFuture().complete(response);
} }
...@@ -87,7 +91,14 @@ public class WSClient extends WebSocketClient implements AutoCloseable { ...@@ -87,7 +91,14 @@ public class WSClient extends WebSocketClient implements AutoCloseable {
@Override @Override
public void onMessage(ByteBuffer bytes) { public void onMessage(ByteBuffer bytes) {
super.onMessage(bytes); bytes.order(ByteOrder.LITTLE_ENDIAN);
long id = bytes.getLong();
ResponseFuture remove = inFlightRequest.remove(Action.FETCH_BLOCK.getAction(), id);
if (null != remove) {
// FetchBlockResp fetchBlockResp = new FetchBlockResp(id, bytes.slice());
FetchBlockResp fetchBlockResp = new FetchBlockResp(id, bytes);
remove.getFuture().complete(fetchBlockResp);
}
} }
@Override @Override
...@@ -97,7 +108,6 @@ public class WSClient extends WebSocketClient implements AutoCloseable { ...@@ -97,7 +108,6 @@ public class WSClient extends WebSocketClient implements AutoCloseable {
} else { } else {
throw new RuntimeException("close connection: " + reason); throw new RuntimeException("close connection: " + reason);
} }
} }
@Override @Override
...@@ -109,6 +119,42 @@ public class WSClient extends WebSocketClient implements AutoCloseable { ...@@ -109,6 +119,42 @@ public class WSClient extends WebSocketClient implements AutoCloseable {
public void close() { public void close() {
super.close(); super.close();
executor.shutdown(); executor.shutdown();
inFlightRequest.close(); }
static class ConnectReq extends Payload {
private String user;
private String password;
private String db;
public ConnectReq(long reqId, String user, String password, String db) {
super(reqId);
this.user = user;
this.password = password;
this.db = db;
}
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getDb() {
return db;
}
public void setDb(String db) {
this.db = db;
}
} }
} }
...@@ -5,6 +5,7 @@ import com.taosdata.jdbc.TSDBDriver; ...@@ -5,6 +5,7 @@ import com.taosdata.jdbc.TSDBDriver;
import com.taosdata.jdbc.TSDBError; import com.taosdata.jdbc.TSDBError;
import com.taosdata.jdbc.TSDBErrorNumbers; import com.taosdata.jdbc.TSDBErrorNumbers;
import com.taosdata.jdbc.rs.RestfulDatabaseMetaData; import com.taosdata.jdbc.rs.RestfulDatabaseMetaData;
import com.taosdata.jdbc.ws.entity.RequestFactory;
import java.sql.DatabaseMetaData; import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
...@@ -16,14 +17,14 @@ public class WSConnection extends AbstractConnection { ...@@ -16,14 +17,14 @@ public class WSConnection extends AbstractConnection {
private final Transport transport; private final Transport transport;
private final DatabaseMetaData metaData; private final DatabaseMetaData metaData;
private final String database; private final String database;
private boolean fetchType; private final RequestFactory factory;
public WSConnection(String url, Properties properties, Transport transport, String database, boolean fetchType) { public WSConnection(String url, Properties properties, Transport transport, String database) {
super(properties); super(properties);
this.transport = transport; this.transport = transport;
this.database = database; this.database = database;
this.fetchType = fetchType;
this.metaData = new RestfulDatabaseMetaData(url, properties.getProperty(TSDBDriver.PROPERTY_KEY_USER), this); this.metaData = new RestfulDatabaseMetaData(url, properties.getProperty(TSDBDriver.PROPERTY_KEY_USER), this);
this.factory = new RequestFactory();
} }
@Override @Override
...@@ -31,8 +32,7 @@ public class WSConnection extends AbstractConnection { ...@@ -31,8 +32,7 @@ public class WSConnection extends AbstractConnection {
if (isClosed()) if (isClosed())
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_CONNECTION_CLOSED); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_CONNECTION_CLOSED);
// return new WSStatement(transport, database , fetchType); return new WSStatement(transport, database, this, factory);
return null;
} }
@Override @Override
......
package com.taosdata.jdbc.ws;
import com.taosdata.jdbc.AbstractStatement;
import com.taosdata.jdbc.TSDBError;
import com.taosdata.jdbc.TSDBErrorNumbers;
import com.taosdata.jdbc.utils.SqlSyntaxValidator;
import com.taosdata.jdbc.ws.entity.*;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class WSStatement extends AbstractStatement {
private final Transport transport;
private final String database;
private final Connection connection;
private final RequestFactory factory;
private boolean closed;
private ResultSet resultSet;
public WSStatement(Transport transport, String database, Connection connection, RequestFactory factory) {
this.transport = transport;
this.database = database;
this.connection = connection;
this.factory = factory;
}
@Override
public ResultSet executeQuery(String sql) throws SQLException {
if (isClosed())
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED);
if (!SqlSyntaxValidator.isValidForExecuteQuery(sql))
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_INVALID_FOR_EXECUTE_QUERY, "not a valid sql for executeQuery: " + sql);
this.execute(sql);
return this.resultSet;
}
@Override
public int executeUpdate(String sql) throws SQLException {
if (isClosed())
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED);
if (!SqlSyntaxValidator.isValidForExecuteUpdate(sql))
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_INVALID_FOR_EXECUTE_UPDATE, "not a valid sql for executeUpdate: " + sql);
this.execute(sql);
return affectedRows;
}
@Override
public void close() throws SQLException {
if (!isClosed())
this.closed = true;
}
@Override
public boolean execute(String sql) throws SQLException {
if (isClosed())
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED);
Request request = factory.generateQuery(sql);
CompletableFuture<Response> send = transport.send(request);
Response response;
try {
response = send.get();
QueryResp queryResp = (QueryResp) response;
if (Code.SUCCESS.getCode() != queryResp.getCode()) {
throw TSDBError.createSQLException(queryResp.getCode(), queryResp.getMessage());
}
if (queryResp.isUpdate()) {
this.resultSet = null;
this.affectedRows = queryResp.getAffectedRows();
return false;
} else {
this.resultSet = new BlockResultSet(this, this.transport, this.factory, queryResp, this.database);
this.affectedRows = -1;
return true;
}
} catch (InterruptedException | ExecutionException e) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_INVALID_WITH_EXECUTEQUERY, e.getMessage());
}
}
@Override
public ResultSet getResultSet() throws SQLException {
if (isClosed())
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED);
return this.resultSet;
}
@Override
public int getUpdateCount() throws SQLException {
if (isClosed())
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED);
return affectedRows;
}
@Override
public Connection getConnection() throws SQLException {
return this.connection;
}
@Override
public boolean isClosed() throws SQLException {
return closed;
}
}
...@@ -11,8 +11,9 @@ public enum Action { ...@@ -11,8 +11,9 @@ public enum Action {
QUERY("query", QueryResp.class), QUERY("query", QueryResp.class),
FETCH("fetch", FetchResp.class), FETCH("fetch", FetchResp.class),
FETCH_JSON("fetch_json", FetchJsonResp.class), FETCH_JSON("fetch_json", FetchJsonResp.class),
// fetch_block's class is meaningless FETCH_BLOCK("fetch_block", FetchBlockResp.class),
FETCH_BLOCK("fetch_block", Response.class), // free_result's class is meaningless
FREE_RESULT("free_result", Response.class),
; ;
private final String action; private final String action;
private final Class<? extends Response> clazz; private final Class<? extends Response> clazz;
...@@ -35,7 +36,6 @@ public enum Action { ...@@ -35,7 +36,6 @@ public enum Action {
static { static {
for (Action value : Action.values()) { for (Action value : Action.values()) {
actions.put(value.action, value); actions.put(value.action, value);
IdUtil.init(value.action);
} }
} }
......
...@@ -5,7 +5,6 @@ package com.taosdata.jdbc.ws.entity; ...@@ -5,7 +5,6 @@ package com.taosdata.jdbc.ws.entity;
*/ */
public enum Code { public enum Code {
SUCCESS(0, "success"), SUCCESS(0, "success"),
; ;
private final int code; private final int code;
......
package com.taosdata.jdbc.ws.entity; package com.taosdata.jdbc.ws.entity;
public class FetchBlockResp { import java.nio.ByteBuffer;
public class FetchBlockResp extends Response {
private ByteBuffer buffer;
public FetchBlockResp(long id, ByteBuffer buffer) {
this.setAction(Action.FETCH_BLOCK.getAction());
this.setReqId(id);
this.buffer = buffer;
}
public ByteBuffer getBuffer() {
return buffer;
}
public void setBuffer(ByteBuffer buffer) {
this.buffer = buffer;
}
} }
package com.taosdata.jdbc.ws.entity; package com.taosdata.jdbc.ws.entity;
import com.alibaba.fastjson.JSONArray;
public class FetchJsonResp extends Response{ public class FetchJsonResp extends Response{
private long id; private long id;
private Object[][] data; private JSONArray data;
public Object[][] getData() { public JSONArray getData() {
return data; return data;
} }
public void setData(Object[][] data) { public void setData(JSONArray data) {
this.data = data; this.data = data;
} }
......
package com.taosdata.jdbc.ws.entity;
public class FetchReq extends Payload {
private long id;
public FetchReq(long reqId, long id) {
super(reqId);
this.id = id;
}
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
}
...@@ -8,7 +8,7 @@ public class FetchResp extends Response{ ...@@ -8,7 +8,7 @@ public class FetchResp extends Response{
private String message; private String message;
private long id; private long id;
private boolean completed; private boolean completed;
private int[] lengths; private Integer[] lengths;
private int rows; private int rows;
public int getCode() { public int getCode() {
...@@ -43,11 +43,11 @@ public class FetchResp extends Response{ ...@@ -43,11 +43,11 @@ public class FetchResp extends Response{
this.completed = completed; this.completed = completed;
} }
public int[] getLengths() { public Integer[] getLengths() {
return lengths; return lengths;
} }
public void setLengths(int[] lengths) { public void setLengths(Integer[] lengths) {
this.lengths = lengths; this.lengths = lengths;
} }
......
package com.taosdata.jdbc.ws.entity;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
/**
* generate id for request
*/
public class IdUtil {
private static final Map<String, AtomicLong> ids = new HashMap<>();
public static long getId(String action) {
return ids.get(action).incrementAndGet();
}
public static void init(String action) {
ids.put(action, new AtomicLong(0));
}
}
package com.taosdata.jdbc.ws.entity;
import com.alibaba.fastjson.annotation.JSONField;
public class Payload {
@JSONField(name = "req_id")
private final long reqId;
public Payload(long reqId) {
this.reqId = reqId;
}
public long getReqId() {
return reqId;
}
}
\ No newline at end of file
package com.taosdata.jdbc.ws.entity;
public class QueryReq extends Payload {
private String sql;
public QueryReq(long reqId, String sql) {
super(reqId);
this.sql = sql;
}
public String getSql() {
return sql;
}
public void setSql(String sql) {
this.sql = sql;
}
}
package com.taosdata.jdbc.ws.entity; package com.taosdata.jdbc.ws.entity;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.annotation.JSONField;
/** /**
* send to taosadapter * send to taosadapter
...@@ -15,14 +14,14 @@ public class Request { ...@@ -15,14 +14,14 @@ public class Request {
this.args = args; this.args = args;
} }
public String id() {
return action + "_" + args.getReqId();
}
public String getAction() { public String getAction() {
return action; return action;
} }
public Long id(){
return args.getReqId();
}
public void setAction(String action) { public void setAction(String action) {
this.action = action; this.action = action;
} }
...@@ -39,118 +38,4 @@ public class Request { ...@@ -39,118 +38,4 @@ public class Request {
public String toString() { public String toString() {
return JSON.toJSONString(this); return JSON.toJSONString(this);
} }
public static Request generateConnect(String user, String password, String db) {
long reqId = IdUtil.getId(Action.CONN.getAction());
ConnectReq connectReq = new ConnectReq(reqId, user, password, db);
return new Request(Action.CONN.getAction(), connectReq);
}
public static Request generateQuery(String sql) {
long reqId = IdUtil.getId(Action.QUERY.getAction());
QueryReq queryReq = new QueryReq(reqId, sql);
return new Request(Action.QUERY.getAction(), queryReq);
}
public static Request generateFetch(long id) {
long reqId = IdUtil.getId(Action.FETCH.getAction());
FetchReq fetchReq = new FetchReq(reqId, id);
return new Request(Action.FETCH.getAction(), fetchReq);
}
public static Request generateFetchJson(long id) {
long reqId = IdUtil.getId(Action.FETCH_JSON.getAction());
FetchReq fetchReq = new FetchReq(reqId, id);
return new Request(Action.FETCH_JSON.getAction(), fetchReq);
}
public static Request generateFetchBlock(long id) {
long reqId = IdUtil.getId(Action.FETCH_BLOCK.getAction());
FetchReq fetchReq = new FetchReq(reqId, id);
return new Request(Action.FETCH_BLOCK.getAction(), fetchReq);
}
}
class Payload {
@JSONField(name = "req_id")
private final long reqId;
public Payload(long reqId) {
this.reqId = reqId;
}
public long getReqId() {
return reqId;
}
}
class ConnectReq extends Payload {
private String user;
private String password;
private String db;
public ConnectReq(long reqId, String user, String password, String db) {
super(reqId);
this.user = user;
this.password = password;
this.db = db;
}
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getDb() {
return db;
}
public void setDb(String db) {
this.db = db;
}
}
class QueryReq extends Payload {
private String sql;
public QueryReq(long reqId, String sql) {
super(reqId);
this.sql = sql;
}
public String getSql() {
return sql;
}
public void setSql(String sql) {
this.sql = sql;
}
}
class FetchReq extends Payload {
private long id;
public FetchReq(long reqId, long id) {
super(reqId);
this.id = id;
}
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
} }
\ No newline at end of file
package com.taosdata.jdbc.ws.entity;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
/**
* generate id for request
*/
public class RequestFactory {
private final Map<String, AtomicLong> ids = new HashMap<>();
public long getId(String action) {
return ids.get(action).incrementAndGet();
}
public RequestFactory() {
for (Action value : Action.values()) {
String action = value.getAction();
if (Action.CONN.getAction().equals(action) || Action.FETCH_BLOCK.getAction().equals(action))
continue;
ids.put(action, new AtomicLong(0));
}
}
public Request generateQuery(String sql) {
long reqId = this.getId(Action.QUERY.getAction());
QueryReq queryReq = new QueryReq(reqId, sql);
return new Request(Action.QUERY.getAction(), queryReq);
}
public Request generateFetch(long id) {
long reqId = this.getId(Action.FETCH.getAction());
FetchReq fetchReq = new FetchReq(reqId, id);
return new Request(Action.FETCH.getAction(), fetchReq);
}
public Request generateFetchJson(long id) {
long reqId = this.getId(Action.FETCH_JSON.getAction());
FetchReq fetchReq = new FetchReq(reqId, id);
return new Request(Action.FETCH_JSON.getAction(), fetchReq);
}
public Request generateFetchBlock(long id) {
FetchReq fetchReq = new FetchReq(id, id);
return new Request(Action.FETCH_BLOCK.getAction(), fetchReq);
}
}
...@@ -11,10 +11,6 @@ public class Response { ...@@ -11,10 +11,6 @@ public class Response {
@JSONField(name = "req_id") @JSONField(name = "req_id")
private long reqId; private long reqId;
public String id() {
return action + "_" + reqId;
}
public String getAction() { public String getAction() {
return action; return action;
} }
......
...@@ -10,15 +10,17 @@ import org.junit.runner.RunWith; ...@@ -10,15 +10,17 @@ import org.junit.runner.RunWith;
import java.sql.*; import java.sql.*;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.TimeUnit;
/** /**
* You need to start taosadapter before testing this method * You need to start taosadapter before testing this method
*/ */
@Ignore @Ignore
@RunWith(CatalogRunner.class) @RunWith(CatalogRunner.class)
@TestTarget(alias = "test connection with server", author = "huolibo",version = "2.0.37") @TestTarget(alias = "test connection with server", author = "huolibo", version = "2.0.37")
public class WSConnectionTest { public class WSConnectionTest {
private static final String host = "192.168.1.98"; // private static final String host = "192.168.1.98";
private static final String host = "127.0.0.1";
private static final int port = 6041; private static final int port = 6041;
private Connection connection; private Connection connection;
...@@ -46,13 +48,12 @@ public class WSConnectionTest { ...@@ -46,13 +48,12 @@ public class WSConnectionTest {
String url = "jdbc:TAOS-RS://" + host + ":" + port + "/"; String url = "jdbc:TAOS-RS://" + host + ":" + port + "/";
Properties properties = new Properties(); Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_USER, "root"); properties.setProperty(TSDBDriver.PROPERTY_KEY_USER, "root");
properties.setProperty(TSDBDriver.PROPERTY_KEY_PASSWORD,"taosdata"); properties.setProperty(TSDBDriver.PROPERTY_KEY_PASSWORD, "taosdata");
properties.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "true"); properties.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "true");
connection = DriverManager.getConnection(url, properties); connection = DriverManager.getConnection(url, properties);
} }
@Test @Test(expected = SQLException.class)
// @Test(expected = SQLException.class)
@Description("wrong password or user") @Description("wrong password or user")
public void wrongUserOrPasswordConection() throws SQLException { public void wrongUserOrPasswordConection() throws SQLException {
String url = "jdbc:TAOS-RS://" + host + ":" + port + "/test?user=abc&password=taosdata"; String url = "jdbc:TAOS-RS://" + host + ":" + port + "/test?user=abc&password=taosdata";
...@@ -60,4 +61,21 @@ public class WSConnectionTest { ...@@ -60,4 +61,21 @@ public class WSConnectionTest {
properties.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "true"); properties.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "true");
connection = DriverManager.getConnection(url, properties); connection = DriverManager.getConnection(url, properties);
} }
@Test
@Description("sleep keep connection")
public void keepConnection() throws SQLException, InterruptedException {
String url = "jdbc:TAOS-RS://" + host + ":" + port + "/?user=root&password=taosdata";
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "true");
connection = DriverManager.getConnection(url, properties);
TimeUnit.MINUTES.sleep(1);
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery("show databases");
TimeUnit.MINUTES.sleep(1);
resultSet.next();
System.out.println(resultSet.getTimestamp(1));
resultSet.close();
statement.close();
}
} }
package com.taosdata.jdbc.ws;
import com.taosdata.jdbc.TSDBDriver;
import com.taosdata.jdbc.annotation.CatalogRunner;
import com.taosdata.jdbc.annotation.Description;
import com.taosdata.jdbc.annotation.TestTarget;
import org.junit.*;
import org.junit.runner.RunWith;
import java.sql.*;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
@Ignore
@RunWith(CatalogRunner.class)
@TestTarget(alias = "query test", author = "huolibo", version = "2.0.38")
@FixMethodOrder
public class WSQueryTest {
private static final String host = "192.168.1.98";
private static final int port = 6041;
private static final String databaseName = "ws_query";
private static final String tableName = "wq";
private Connection connection;
private long now;
@Description("query")
@Test
public void queryBlock() throws SQLException, InterruptedException {
IntStream.range(1, 100).limit(1000).parallel().forEach(x -> {
try {
Statement statement = connection.createStatement();
statement.execute("insert into " + databaseName + "." + tableName + " values(now+100s, 100)");
ResultSet resultSet = statement.executeQuery("select * from " + databaseName + "." + tableName);
resultSet.next();
Assert.assertEquals(100, resultSet.getInt(2));
statement.close();
TimeUnit.SECONDS.sleep(10);
} catch (SQLException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
@Before
public void before() throws SQLException {
String url = "jdbc:TAOS-RS://" + host + ":" + port + "/test?user=root&password=taosdata";
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "true");
connection = DriverManager.getConnection(url, properties);
Statement statement = connection.createStatement();
statement.execute("drop database if exists " + databaseName);
statement.execute("create database " + databaseName);
statement.execute("use " + databaseName);
statement.execute("create table if not exists " + databaseName + "." + tableName + "(ts timestamp, f int)");
statement.close();
}
}
package com.taosdata.jdbc.ws;
import com.taosdata.jdbc.TSDBDriver;
import com.taosdata.jdbc.enums.TimestampFormat;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
@Ignore
public class WSSelectTest {
// private static final String host = "192.168.1.98";
private static final String host = "127.0.0.1";
private static final int port = 6041;
private static Connection connection;
private static final String databaseName = "driver";
private static void testInsert() throws SQLException {
Statement statement = connection.createStatement();
long cur = System.currentTimeMillis();
List<String> timeList = new ArrayList<>();
for (long i = 0L; i < 3000; i++) {
long t = cur + i;
timeList.add("insert into " + databaseName + ".alltype_query values(" + t + ",1,1,1,1,1,1,1,1,1,1,1,'test_binary','test_nchar')");
}
for (int i = 0; i < 3000; i++) {
statement.execute(timeList.get(i));
}
statement.close();
}
@Test
public void testWSSelect() throws SQLException {
Statement statement = connection.createStatement();
int count = 0;
long start = System.nanoTime();
for (int i = 0; i < 1000; i++) {
ResultSet resultSet = statement.executeQuery("select ts,c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13 from " + databaseName + ".alltype_query limit 3000");
while (resultSet.next()) {
count++;
resultSet.getTimestamp(1);
resultSet.getBoolean(2);
resultSet.getInt(3);
resultSet.getInt(4);
resultSet.getInt(5);
resultSet.getLong(6);
resultSet.getInt(7);
resultSet.getInt(8);
resultSet.getLong(9);
resultSet.getLong(10);
resultSet.getFloat(11);
resultSet.getDouble(12);
resultSet.getString(13);
resultSet.getString(14);
}
}
long d = System.nanoTime() - start;
System.out.println(d / 1000);
System.out.println(count);
statement.close();
}
@BeforeClass
public static void beforeClass() throws SQLException {
String url = "jdbc:TAOS-RS://" + host + ":" + port + "/?user=root&password=taosdata";
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_TIMESTAMP_FORMAT, String.valueOf(TimestampFormat.UTC));
properties.setProperty(TSDBDriver.PROPERTY_KEY_MESSAGE_WAIT_TIMEOUT, "100000");
properties.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "true");
connection = DriverManager.getConnection(url, properties);
Statement statement = connection.createStatement();
statement.execute("drop database if exists " + databaseName);
statement.execute("create database " + databaseName);
statement.execute("create table " + databaseName + ".alltype_query(ts timestamp, c1 bool,c2 tinyint, c3 smallint, c4 int, c5 bigint, c6 tinyint unsigned, c7 smallint unsigned, c8 int unsigned, c9 bigint unsigned, c10 float, c11 double, c12 binary(20), c13 nchar(30) )");
statement.close();
testInsert();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册