提交 2aff89c7 编写于 作者: wmmhello's avatar wmmhello

Merge branch 'develop' into feature/TD-10987

...@@ -87,14 +87,14 @@ TDengine是一个高效的存储、查询、分析时序大数据的平台,专 ...@@ -87,14 +87,14 @@ TDengine是一个高效的存储、查询、分析时序大数据的平台,专
* [taosAdapter](/tools/adapter): TDengine 集群和应用之间的 RESTful 接口适配服务。 * [taosAdapter](/tools/adapter): TDengine 集群和应用之间的 RESTful 接口适配服务。
* [TDinsight](/tools/insight): 监控 TDengine 集群的 Grafana 面板集合。 * [TDinsight](/tools/insight): 监控 TDengine 集群的 Grafana 面板集合。
* [taosdump](/tools/taosdump): TDengine 数据备份工具。使用 taosdump 请安装 taosTools。 * [taosdump](/tools/taosdump): TDengine 数据备份工具。使用 taosdump 请安装 taosTools。
* [taosBenchmark](/tools/taosbenchmark): TDengine 压力测试工具。使用 taosBenchmark 请安装 taosTools。 * [taosBenchmark](/tools/taosbenchmark): TDengine 压力测试工具。
## [与其他工具的连接](/connections) ## [与其他工具的连接](/connections)
* [Grafana](/connections#grafana):获取并可视化保存在TDengine的数据 * [Grafana](/connections#grafana):获取并可视化保存在TDengine的数据
* [IDEA Database](https://www.taosdata.com/blog/2020/08/27/1767.html):通过IDEA 数据库管理工具可视化使用 TDengine * [IDEA Database](https://www.taosdata.com/blog/2020/08/27/1767.html):通过IDEA 数据库管理工具可视化使用 TDengine
* [基于Electron开发的跨平台TDengine图形化管理工具](https://github.com/skye0207/TDengineGUI) * [TDengineGUI](https://github.com/skye0207/TDengineGUI):基于Electron开发的跨平台TDengine图形化管理工具
* [基于DataX的TDeninge数据迁移工具](https://www.taosdata.com/blog/2021/10/26/3156.html) * [DataX](https://www.taosdata.com/blog/2021/10/26/3156.html):支持 TDeninge 和其他数据库之间进行数据迁移的工具
## [TDengine集群的安装、管理](/cluster) ## [TDengine集群的安装、管理](/cluster)
......
...@@ -45,14 +45,6 @@ sudo apt-get install tdengine ...@@ -45,14 +45,6 @@ sudo apt-get install tdengine
<ul id="client-packagelist" class="package-list"></ul> <ul id="client-packagelist" class="package-list"></ul>
### 安装 taosTools
taosTools 是多个用于 TDengine 的辅助工具软件集合。目前包含用于数据备份恢复的 taosdump 和用于安装 grafanaplugin 和 dashboard 的脚本 TDinsight.sh。
运行 taosdump 需要安装 TDengine server 或 TDengine client 安装包,推荐使用 deb 或 rpm 格式安装包,方便安装依赖软件。
<ul id="taos-tools" class="package-list"></ul>
### <a class="anchor" id="source-install"></a>通过源码安装 ### <a class="anchor" id="source-install"></a>通过源码安装
如果您希望对 TDengine 贡献代码或对内部实现感兴趣,请参考我们的 [TDengine github 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装. 如果您希望对 TDengine 贡献代码或对内部实现感兴趣,请参考我们的 [TDengine github 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装.
...@@ -160,7 +152,7 @@ taos> source <filename>; ...@@ -160,7 +152,7 @@ taos> source <filename>;
### <a class="anchor" id="taosBenchmark"></a> 使用 taosBenchmark 体验写入速度 ### <a class="anchor" id="taosBenchmark"></a> 使用 taosBenchmark 体验写入速度
启动 TDengine 的服务,在 Linux 终端执行 `taosBenchmark` (曾命名为 taosdemo) 启动 TDengine 的服务,在 Linux 终端执行 `taosBenchmark` (曾命名为 taosdemo)。taosBenchmark 在 TDengine 2.4.0.7 和之前发布版本在 taosTools 安装包中发布提供,在后续版本中 taosBenchmark 将在 TDengine 标准安装包中发布。
```bash ```bash
taosBenchmark taosBenchmark
......
...@@ -85,7 +85,7 @@ TDengine is a highly efficient platform to store, query, and analyze time-series ...@@ -85,7 +85,7 @@ TDengine is a highly efficient platform to store, query, and analyze time-series
* [taosAdapter](/tools/adapter): a bridge/adapter between TDengine cluster and applications. * [taosAdapter](/tools/adapter): a bridge/adapter between TDengine cluster and applications.
* [TDinsight](/tools/insight): monitoring TDengine cluster with Grafana. * [TDinsight](/tools/insight): monitoring TDengine cluster with Grafana.
* [taosdump](/tools/taosdump): backup tool for TDengine. Please install `taosTools` package for it. * [taosdump](/tools/taosdump): backup tool for TDengine. Please install `taosTools` package for it.
* [taosBenchmark](/tools/taosbenchmark): stress test tool for TDengine. Please install `taosTools` package for it. * [taosBenchmark](/tools/taosbenchmark): stress test tool for TDengine.
## [Connections with Other Tools](/connections) ## [Connections with Other Tools](/connections)
...@@ -93,8 +93,8 @@ TDengine is a highly efficient platform to store, query, and analyze time-series ...@@ -93,8 +93,8 @@ TDengine is a highly efficient platform to store, query, and analyze time-series
- [MATLAB](/connections#matlab): access data stored in TDengine server via JDBC configured within MATLAB - [MATLAB](/connections#matlab): access data stored in TDengine server via JDBC configured within MATLAB
- [R](/connections#r): access data stored in TDengine server via JDBC configured within R - [R](/connections#r): access data stored in TDengine server via JDBC configured within R
- [IDEA Database](https://www.taosdata.com/blog/2020/08/27/1767.html): use TDengine visually through IDEA Database Management Tool - [IDEA Database](https://www.taosdata.com/blog/2020/08/27/1767.html): use TDengine visually through IDEA Database Management Tool
- [TDengineGUI](https://github.com/skye0207/TDengineGUI) - [TDengineGUI](https://github.com/skye0207/TDengineGUI): a TDengine management tool with Graphical User Interface
- [DataX, a data immigaration tool with TDeninge supported](https://github.com/taosdata/datax) - [DataX](https://github.com/taosdata/datax): a data immigaration tool with TDeninge supported
## [Installation and Management of TDengine Cluster](/cluster) ## [Installation and Management of TDengine Cluster](/cluster)
......
...@@ -45,14 +45,6 @@ If the client and server are running on different computers, you can install the ...@@ -45,14 +45,6 @@ If the client and server are running on different computers, you can install the
<ul id="client-packagelist" class="package-list"></ul> <ul id="client-packagelist" class="package-list"></ul>
### Install taosTools
taosTools is a collection of helper software for TDengine. It currently includes taosdump for data backup and recovery and the script TDinsight.sh for installing grafanaplugin and dashboard.
To run taosdump, you need to install the TDengine server or TDengine client installer. It is recommended to use the deb or rpm format to install the dependent software.
<ul id="taos-tools" class="package-list"></ul>
### <a class="anchor" id="source-install"></a>Install from Source ### <a class="anchor" id="source-install"></a>Install from Source
If you want to contribute to TDengine, please visit [TDengine GitHub page](https://github.com/taosdata/TDengine) for detailed instructions on build and installation from the source code. If you want to contribute to TDengine, please visit [TDengine GitHub page](https://github.com/taosdata/TDengine) for detailed instructions on build and installation from the source code.
...@@ -166,7 +158,7 @@ taos> source <filename>; ...@@ -166,7 +158,7 @@ taos> source <filename>;
### <a class="anchor" id="taosBenchmark"></a> Taste insertion speed with taosBenchmark ### <a class="anchor" id="taosBenchmark"></a> Taste insertion speed with taosBenchmark
Once the TDengine server started, you can execute the command `taosBenchmark` (which was named `taosdemo`) in the Linux terminal. Once the TDengine server started, you can execute the command `taosBenchmark` (which was named `taosdemo`) in the Linux terminal. In 2.4.0.7 and early release, taosBenchmark is distributed within taosTools package. In later release, taosBenchmark will be included within TDengine again.
```bash ```bash
taosBenchmark taosBenchmark
......
...@@ -3,6 +3,7 @@ package com.taosdata.jdbc.rs; ...@@ -3,6 +3,7 @@ package com.taosdata.jdbc.rs;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.taosdata.jdbc.*; import com.taosdata.jdbc.*;
import com.taosdata.jdbc.enums.TimestampFormat;
import com.taosdata.jdbc.utils.HttpClientPoolUtil; import com.taosdata.jdbc.utils.HttpClientPoolUtil;
import com.taosdata.jdbc.ws.InFlightRequest; import com.taosdata.jdbc.ws.InFlightRequest;
import com.taosdata.jdbc.ws.Transport; import com.taosdata.jdbc.ws.Transport;
...@@ -77,18 +78,20 @@ public class RestfulDriver extends AbstractDriver { ...@@ -77,18 +78,20 @@ public class RestfulDriver extends AbstractDriver {
int maxRequest = props.containsKey(TSDBDriver.PROPERTY_KEY_MAX_CONCURRENT_REQUEST) int maxRequest = props.containsKey(TSDBDriver.PROPERTY_KEY_MAX_CONCURRENT_REQUEST)
? Integer.parseInt(props.getProperty(TSDBDriver.PROPERTY_KEY_MAX_CONCURRENT_REQUEST)) ? Integer.parseInt(props.getProperty(TSDBDriver.PROPERTY_KEY_MAX_CONCURRENT_REQUEST))
: Transport.DEFAULT_MAX_REQUEST; : Transport.DEFAULT_MAX_REQUEST;
InFlightRequest inFlightRequest = new InFlightRequest(timeout, maxRequest); InFlightRequest inFlightRequest = new InFlightRequest(timeout, maxRequest);
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
Map<String, String> httpHeaders = new HashMap<>(); Map<String, String> httpHeaders = new HashMap<>();
client = new WSClient(new URI(loginUrl), user, password, database, inFlightRequest, httpHeaders, latch, maxRequest); client = new WSClient(new URI(loginUrl), user, password, database,
inFlightRequest, httpHeaders, latch, maxRequest);
transport = new Transport(client, inFlightRequest); transport = new Transport(client, inFlightRequest);
if (!client.connectBlocking()) { if (!client.connectBlocking(timeout, TimeUnit.MILLISECONDS)) {
throw new SQLException("can't create connection with server"); throw new SQLException("can't create connection with server");
} }
if (!latch.await(timeout, TimeUnit.MILLISECONDS)) { if (!latch.await(timeout, TimeUnit.MILLISECONDS)) {
throw new SQLException("auth timeout"); throw new SQLException("auth timeout");
} }
if (client.isAuth()) { if (!client.isAuth()) {
throw new SQLException("auth failure"); throw new SQLException("auth failure");
} }
} catch (URISyntaxException e) { } catch (URISyntaxException e) {
...@@ -96,7 +99,9 @@ public class RestfulDriver extends AbstractDriver { ...@@ -96,7 +99,9 @@ public class RestfulDriver extends AbstractDriver {
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new SQLException("creat websocket connection has been Interrupted ", e); throw new SQLException("creat websocket connection has been Interrupted ", e);
} }
return new WSConnection(url, props, transport, database, true); // TODO fetch Type from config
props.setProperty(TSDBDriver.PROPERTY_KEY_TIMESTAMP_FORMAT, String.valueOf(TimestampFormat.TIMESTAMP));
return new WSConnection(url, props, transport, database);
} }
loginUrl = "http://" + props.getProperty(TSDBDriver.PROPERTY_KEY_HOST) + ":" + props.getProperty(TSDBDriver.PROPERTY_KEY_PORT) + "/rest/login/" + user + "/" + password + ""; loginUrl = "http://" + props.getProperty(TSDBDriver.PROPERTY_KEY_HOST) + ":" + props.getProperty(TSDBDriver.PROPERTY_KEY_PORT) + "/rest/login/" + user + "/" + password + "";
int poolSize = Integer.parseInt(props.getProperty("httpPoolSize", HttpClientPoolUtil.DEFAULT_MAX_PER_ROUTE)); int poolSize = Integer.parseInt(props.getProperty("httpPoolSize", HttpClientPoolUtil.DEFAULT_MAX_PER_ROUTE));
......
...@@ -302,6 +302,9 @@ public class RestfulResultSet extends AbstractResultSet implements ResultSet { ...@@ -302,6 +302,9 @@ public class RestfulResultSet extends AbstractResultSet implements ResultSet {
this.taos_type = taos_type; this.taos_type = taos_type;
} }
public int getTaosType() {
return taos_type;
}
} }
@Override @Override
......
package com.taosdata.jdbc.ws;
import com.taosdata.jdbc.*;
import com.taosdata.jdbc.rs.RestfulResultSet;
import com.taosdata.jdbc.rs.RestfulResultSetMetaData;
import com.taosdata.jdbc.ws.entity.*;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.chrono.IsoChronology;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.time.format.ResolverStyle;
import java.time.temporal.ChronoField;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public abstract class AbstractWSResultSet extends AbstractResultSet {
public static DateTimeFormatter rfc3339Parser = new DateTimeFormatterBuilder()
.parseCaseInsensitive()
.appendValue(ChronoField.YEAR, 4)
.appendLiteral('-')
.appendValue(ChronoField.MONTH_OF_YEAR, 2)
.appendLiteral('-')
.appendValue(ChronoField.DAY_OF_MONTH, 2)
.appendLiteral('T')
.appendValue(ChronoField.HOUR_OF_DAY, 2)
.appendLiteral(':')
.appendValue(ChronoField.MINUTE_OF_HOUR, 2)
.appendLiteral(':')
.appendValue(ChronoField.SECOND_OF_MINUTE, 2)
.optionalStart()
.appendFraction(ChronoField.NANO_OF_SECOND, 2, 9, true)
.optionalEnd()
.appendOffset("+HH:MM", "Z").toFormatter()
.withResolverStyle(ResolverStyle.STRICT)
.withChronology(IsoChronology.INSTANCE);
protected final Statement statement;
protected final Transport transport;
protected final RequestFactory factory;
protected final long queryId;
protected boolean isClosed;
// meta
protected final ResultSetMetaData metaData;
protected final List<RestfulResultSet.Field> fields = new ArrayList<>();
protected final List<String> columnNames;
protected List<Integer> fieldLength;
// data
protected List<List<Object>> result = new ArrayList<>();
protected int numOfRows = 0;
protected int rowIndex = 0;
private boolean isCompleted;
public AbstractWSResultSet(Statement statement, Transport transport, RequestFactory factory,
QueryResp response, String database) throws SQLException {
this.statement = statement;
this.transport = transport;
this.factory = factory;
this.queryId = response.getId();
columnNames = Arrays.asList(response.getFieldsNames());
for (int i = 0; i < response.getFieldsCount(); i++) {
String colName = response.getFieldsNames()[i];
int taosType = response.getFieldsTypes()[i];
int jdbcType = TSDBConstants.taosType2JdbcType(taosType);
int length = response.getFieldsLengths()[i];
fields.add(new RestfulResultSet.Field(colName, jdbcType, length, "", taosType));
}
this.metaData = new RestfulResultSetMetaData(database, fields, null);
this.timestampPrecision = response.getPrecision();
}
private boolean forward() {
if (this.rowIndex > this.numOfRows) {
return false;
}
return ((++this.rowIndex) < this.numOfRows);
}
public void reset() {
this.rowIndex = 0;
}
@Override
public boolean next() throws SQLException {
if (isClosed()) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_RESULTSET_CLOSED);
}
if (this.forward()) {
return true;
}
Request request = factory.generateFetch(queryId);
CompletableFuture<Response> send = transport.send(request);
try {
Response response = send.get();
FetchResp fetchResp = (FetchResp) response;
if (Code.SUCCESS.getCode() != fetchResp.getCode()) {
// TODO reWrite error type
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, fetchResp.getMessage());
}
this.reset();
if (fetchResp.isCompleted()) {
this.isCompleted = true;
return false;
}
fieldLength = Arrays.asList(fetchResp.getLengths());
this.numOfRows = fetchResp.getRows();
this.result = fetchJsonData();
return true;
} catch (InterruptedException | ExecutionException e) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_RESTFul_Client_IOException, e.getMessage());
}
}
public abstract List<List<Object>> fetchJsonData() throws SQLException, ExecutionException, InterruptedException;
@Override
public void close() throws SQLException {
this.isClosed = true;
if (result != null && !result.isEmpty() && !isCompleted) {
FetchReq fetchReq = new FetchReq(queryId, queryId);
transport.sendWithoutRep(new Request(Action.FREE_RESULT.getAction(), fetchReq));
}
}
@Override
public ResultSetMetaData getMetaData() throws SQLException {
if (isClosed())
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_RESULTSET_CLOSED);
return this.metaData;
}
@Override
public boolean isClosed() throws SQLException {
return isClosed;
}
}
package com.taosdata.jdbc.ws; package com.taosdata.jdbc.ws;
import com.taosdata.jdbc.ws.entity.Action;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.*; import java.util.concurrent.*;
/** /**
* Unfinished execution * Unfinished execution
*/ */
public class InFlightRequest implements AutoCloseable { public class InFlightRequest {
private final int timeoutSec; private final int timeoutSec;
private final Semaphore semaphore; private final Semaphore semaphore;
private final Map<String, ResponseFuture> futureMap = new ConcurrentHashMap<>(); private final Map<String, ConcurrentHashMap<Long, ResponseFuture>> futureMap = new HashMap<>();
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); private final Map<String, PriorityBlockingQueue<ResponseFuture>> expireMap = new HashMap<>();
private final ScheduledFuture<?> scheduledFuture; private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r);
t.setName("timer-" + t.getId());
return t;
});
public InFlightRequest(int timeoutSec, int concurrentNum) { public InFlightRequest(int timeoutSec, int concurrentNum) {
this.timeoutSec = timeoutSec; this.timeoutSec = timeoutSec;
this.semaphore = new Semaphore(concurrentNum); this.semaphore = new Semaphore(concurrentNum);
this.scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this::removeTimeoutFuture, timeoutSec, timeoutSec, TimeUnit.MILLISECONDS); scheduledExecutorService.scheduleWithFixedDelay(this::removeTimeoutFuture,
timeoutSec, timeoutSec, TimeUnit.MILLISECONDS);
Runtime.getRuntime().addShutdownHook(new Thread(scheduledExecutorService::shutdown));
for (Action value : Action.values()) {
String action = value.getAction();
if (Action.CONN.getAction().equals(action))
continue;
futureMap.put(action, new ConcurrentHashMap<>());
expireMap.put(action, new PriorityBlockingQueue<>());
}
} }
public void put(ResponseFuture responseFuture) throws InterruptedException, TimeoutException { public void put(ResponseFuture rf) throws InterruptedException, TimeoutException {
if (semaphore.tryAcquire(timeoutSec, TimeUnit.MILLISECONDS)) { if (semaphore.tryAcquire(timeoutSec, TimeUnit.MILLISECONDS)) {
futureMap.put(responseFuture.getId(), responseFuture); futureMap.get(rf.getAction()).put(rf.getId(), rf);
expireMap.get(rf.getAction()).put(rf);
} else { } else {
throw new TimeoutException(); throw new TimeoutException();
} }
} }
public ResponseFuture remove(String id) { public ResponseFuture remove(String action, Long id) {
ResponseFuture future = futureMap.remove(id); ResponseFuture future = futureMap.get(action).remove(id);
if (null != future) { if (null != future) {
expireMap.get(action).remove(future);
semaphore.release(); semaphore.release();
} }
return future; return future;
} }
private void removeTimeoutFuture() { private void removeTimeoutFuture() {
futureMap.entrySet().removeIf(entry -> { expireMap.forEach((k, v) -> {
if (System.nanoTime() - entry.getValue().getTimestamp() > timeoutSec * 1_000_000L) { while (true) {
ResponseFuture response = v.peek();
if (null == response || (System.nanoTime() - response.getTimestamp()) < timeoutSec * 1_000_000L)
break;
try { try {
entry.getValue().getFuture().completeExceptionally(new TimeoutException()); v.poll();
}finally { futureMap.get(k).remove(response.getId());
response.getFuture().completeExceptionally(new TimeoutException());
} finally {
semaphore.release(); semaphore.release();
} }
return true;
} else {
return false;
} }
}); });
} }
@Override
public void close() {
scheduledFuture.cancel(true);
scheduledExecutorService.shutdown();
}
} }
...@@ -4,18 +4,24 @@ import com.taosdata.jdbc.ws.entity.Response; ...@@ -4,18 +4,24 @@ import com.taosdata.jdbc.ws.entity.Response;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
public class ResponseFuture { public class ResponseFuture implements Comparable<ResponseFuture> {
private final String id; private final String action;
private final Long id;
private final CompletableFuture<Response> future; private final CompletableFuture<Response> future;
private final long timestamp; private final long timestamp;
public ResponseFuture(String id, CompletableFuture<Response> future) { public ResponseFuture(String action, Long id, CompletableFuture<Response> future) {
this.action = action;
this.id = id; this.id = id;
this.future = future; this.future = future;
timestamp = System.nanoTime(); timestamp = System.nanoTime();
} }
public String getId() { public String getAction() {
return action;
}
public Long getId() {
return id; return id;
} }
...@@ -26,4 +32,12 @@ public class ResponseFuture { ...@@ -26,4 +32,12 @@ public class ResponseFuture {
long getTimestamp() { long getTimestamp() {
return timestamp; return timestamp;
} }
@Override
public int compareTo(ResponseFuture rf) {
long r = this.timestamp - rf.timestamp;
if (r > 0) return 1;
if (r < 0) return -1;
return 0;
}
} }
...@@ -25,15 +25,19 @@ public class Transport implements AutoCloseable { ...@@ -25,15 +25,19 @@ public class Transport implements AutoCloseable {
public CompletableFuture<Response> send(Request request) { public CompletableFuture<Response> send(Request request) {
CompletableFuture<Response> completableFuture = new CompletableFuture<>(); CompletableFuture<Response> completableFuture = new CompletableFuture<>();
try { try {
inFlightRequest.put(new ResponseFuture(request.id(), completableFuture)); inFlightRequest.put(new ResponseFuture(request.getAction(), request.id(), completableFuture));
client.send(request.toString()); client.send(request.toString());
} catch (Throwable t) { } catch (Throwable t) {
inFlightRequest.remove(request.id()); inFlightRequest.remove(request.getAction(), request.id());
completableFuture.completeExceptionally(t); completableFuture.completeExceptionally(t);
} }
return completableFuture; return completableFuture;
} }
public void sendWithoutRep(Request request) {
client.send(request.toString());
}
public boolean isClosed() throws SQLException { public boolean isClosed() throws SQLException {
return client.isClosed(); return client.isClosed();
} }
......
...@@ -7,6 +7,8 @@ import org.java_websocket.handshake.ServerHandshake; ...@@ -7,6 +7,8 @@ import org.java_websocket.handshake.ServerHandshake;
import java.net.URI; import java.net.URI;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.util.Map; import java.util.Map;
import java.util.concurrent.*; import java.util.concurrent.*;
...@@ -20,6 +22,7 @@ public class WSClient extends WebSocketClient implements AutoCloseable { ...@@ -20,6 +22,7 @@ public class WSClient extends WebSocketClient implements AutoCloseable {
ThreadPoolExecutor executor; ThreadPoolExecutor executor;
private boolean auth; private boolean auth;
private int reqId;
public boolean isAuth() { public boolean isAuth() {
return auth; return auth;
...@@ -54,8 +57,8 @@ public class WSClient extends WebSocketClient implements AutoCloseable { ...@@ -54,8 +57,8 @@ public class WSClient extends WebSocketClient implements AutoCloseable {
@Override @Override
public void onOpen(ServerHandshake serverHandshake) { public void onOpen(ServerHandshake serverHandshake) {
// certification // certification
Request request = Request.generateConnect(user, password, database); ConnectReq connectReq = new ConnectReq(++reqId, user, password, database);
this.send(request.toString()); this.send(new Request(Action.CONN.getAction(), connectReq).toString());
} }
@Override @Override
...@@ -64,14 +67,15 @@ public class WSClient extends WebSocketClient implements AutoCloseable { ...@@ -64,14 +67,15 @@ public class WSClient extends WebSocketClient implements AutoCloseable {
executor.submit(() -> { executor.submit(() -> {
JSONObject jsonObject = JSONObject.parseObject(message); JSONObject jsonObject = JSONObject.parseObject(message);
if (Action.CONN.getAction().equals(jsonObject.getString("action"))) { if (Action.CONN.getAction().equals(jsonObject.getString("action"))) {
latch.countDown();
if (Code.SUCCESS.getCode() != jsonObject.getInteger("code")) { if (Code.SUCCESS.getCode() != jsonObject.getInteger("code")) {
auth = false;
this.close(); this.close();
} else {
auth = true;
} }
latch.countDown();
} else { } else {
Response response = parseMessage(jsonObject); Response response = parseMessage(jsonObject);
ResponseFuture remove = inFlightRequest.remove(response.id()); ResponseFuture remove = inFlightRequest.remove(response.getAction(), response.getReqId());
if (null != remove) { if (null != remove) {
remove.getFuture().complete(response); remove.getFuture().complete(response);
} }
...@@ -87,7 +91,14 @@ public class WSClient extends WebSocketClient implements AutoCloseable { ...@@ -87,7 +91,14 @@ public class WSClient extends WebSocketClient implements AutoCloseable {
@Override @Override
public void onMessage(ByteBuffer bytes) { public void onMessage(ByteBuffer bytes) {
super.onMessage(bytes); bytes.order(ByteOrder.LITTLE_ENDIAN);
long id = bytes.getLong();
ResponseFuture remove = inFlightRequest.remove(Action.FETCH_BLOCK.getAction(), id);
if (null != remove) {
// FetchBlockResp fetchBlockResp = new FetchBlockResp(id, bytes.slice());
FetchBlockResp fetchBlockResp = new FetchBlockResp(id, bytes);
remove.getFuture().complete(fetchBlockResp);
}
} }
@Override @Override
...@@ -97,7 +108,6 @@ public class WSClient extends WebSocketClient implements AutoCloseable { ...@@ -97,7 +108,6 @@ public class WSClient extends WebSocketClient implements AutoCloseable {
} else { } else {
throw new RuntimeException("close connection: " + reason); throw new RuntimeException("close connection: " + reason);
} }
} }
@Override @Override
...@@ -109,6 +119,42 @@ public class WSClient extends WebSocketClient implements AutoCloseable { ...@@ -109,6 +119,42 @@ public class WSClient extends WebSocketClient implements AutoCloseable {
public void close() { public void close() {
super.close(); super.close();
executor.shutdown(); executor.shutdown();
inFlightRequest.close(); }
static class ConnectReq extends Payload {
private String user;
private String password;
private String db;
public ConnectReq(long reqId, String user, String password, String db) {
super(reqId);
this.user = user;
this.password = password;
this.db = db;
}
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getDb() {
return db;
}
public void setDb(String db) {
this.db = db;
}
} }
} }
...@@ -5,6 +5,7 @@ import com.taosdata.jdbc.TSDBDriver; ...@@ -5,6 +5,7 @@ import com.taosdata.jdbc.TSDBDriver;
import com.taosdata.jdbc.TSDBError; import com.taosdata.jdbc.TSDBError;
import com.taosdata.jdbc.TSDBErrorNumbers; import com.taosdata.jdbc.TSDBErrorNumbers;
import com.taosdata.jdbc.rs.RestfulDatabaseMetaData; import com.taosdata.jdbc.rs.RestfulDatabaseMetaData;
import com.taosdata.jdbc.ws.entity.RequestFactory;
import java.sql.DatabaseMetaData; import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
...@@ -16,14 +17,14 @@ public class WSConnection extends AbstractConnection { ...@@ -16,14 +17,14 @@ public class WSConnection extends AbstractConnection {
private final Transport transport; private final Transport transport;
private final DatabaseMetaData metaData; private final DatabaseMetaData metaData;
private final String database; private final String database;
private boolean fetchType; private final RequestFactory factory;
public WSConnection(String url, Properties properties, Transport transport, String database, boolean fetchType) { public WSConnection(String url, Properties properties, Transport transport, String database) {
super(properties); super(properties);
this.transport = transport; this.transport = transport;
this.database = database; this.database = database;
this.fetchType = fetchType;
this.metaData = new RestfulDatabaseMetaData(url, properties.getProperty(TSDBDriver.PROPERTY_KEY_USER), this); this.metaData = new RestfulDatabaseMetaData(url, properties.getProperty(TSDBDriver.PROPERTY_KEY_USER), this);
this.factory = new RequestFactory();
} }
@Override @Override
...@@ -31,8 +32,7 @@ public class WSConnection extends AbstractConnection { ...@@ -31,8 +32,7 @@ public class WSConnection extends AbstractConnection {
if (isClosed()) if (isClosed())
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_CONNECTION_CLOSED); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_CONNECTION_CLOSED);
// return new WSStatement(transport, database , fetchType); return new WSStatement(transport, database, this, factory);
return null;
} }
@Override @Override
......
package com.taosdata.jdbc.ws;
import com.taosdata.jdbc.AbstractStatement;
import com.taosdata.jdbc.TSDBError;
import com.taosdata.jdbc.TSDBErrorNumbers;
import com.taosdata.jdbc.utils.SqlSyntaxValidator;
import com.taosdata.jdbc.ws.entity.*;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class WSStatement extends AbstractStatement {
private final Transport transport;
private final String database;
private final Connection connection;
private final RequestFactory factory;
private boolean closed;
private ResultSet resultSet;
public WSStatement(Transport transport, String database, Connection connection, RequestFactory factory) {
this.transport = transport;
this.database = database;
this.connection = connection;
this.factory = factory;
}
@Override
public ResultSet executeQuery(String sql) throws SQLException {
if (isClosed())
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED);
if (!SqlSyntaxValidator.isValidForExecuteQuery(sql))
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_INVALID_FOR_EXECUTE_QUERY, "not a valid sql for executeQuery: " + sql);
this.execute(sql);
return this.resultSet;
}
@Override
public int executeUpdate(String sql) throws SQLException {
if (isClosed())
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED);
if (!SqlSyntaxValidator.isValidForExecuteUpdate(sql))
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_INVALID_FOR_EXECUTE_UPDATE, "not a valid sql for executeUpdate: " + sql);
this.execute(sql);
return affectedRows;
}
@Override
public void close() throws SQLException {
if (!isClosed())
this.closed = true;
}
@Override
public boolean execute(String sql) throws SQLException {
if (isClosed())
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED);
Request request = factory.generateQuery(sql);
CompletableFuture<Response> send = transport.send(request);
Response response;
try {
response = send.get();
QueryResp queryResp = (QueryResp) response;
if (Code.SUCCESS.getCode() != queryResp.getCode()) {
throw TSDBError.createSQLException(queryResp.getCode(), queryResp.getMessage());
}
if (queryResp.isUpdate()) {
this.resultSet = null;
this.affectedRows = queryResp.getAffectedRows();
return false;
} else {
this.resultSet = new BlockResultSet(this, this.transport, this.factory, queryResp, this.database);
this.affectedRows = -1;
return true;
}
} catch (InterruptedException | ExecutionException e) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_INVALID_WITH_EXECUTEQUERY, e.getMessage());
}
}
@Override
public ResultSet getResultSet() throws SQLException {
if (isClosed())
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED);
return this.resultSet;
}
@Override
public int getUpdateCount() throws SQLException {
if (isClosed())
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED);
return affectedRows;
}
@Override
public Connection getConnection() throws SQLException {
return this.connection;
}
@Override
public boolean isClosed() throws SQLException {
return closed;
}
}
...@@ -11,8 +11,9 @@ public enum Action { ...@@ -11,8 +11,9 @@ public enum Action {
QUERY("query", QueryResp.class), QUERY("query", QueryResp.class),
FETCH("fetch", FetchResp.class), FETCH("fetch", FetchResp.class),
FETCH_JSON("fetch_json", FetchJsonResp.class), FETCH_JSON("fetch_json", FetchJsonResp.class),
// fetch_block's class is meaningless FETCH_BLOCK("fetch_block", FetchBlockResp.class),
FETCH_BLOCK("fetch_block", Response.class), // free_result's class is meaningless
FREE_RESULT("free_result", Response.class),
; ;
private final String action; private final String action;
private final Class<? extends Response> clazz; private final Class<? extends Response> clazz;
...@@ -35,7 +36,6 @@ public enum Action { ...@@ -35,7 +36,6 @@ public enum Action {
static { static {
for (Action value : Action.values()) { for (Action value : Action.values()) {
actions.put(value.action, value); actions.put(value.action, value);
IdUtil.init(value.action);
} }
} }
......
...@@ -5,7 +5,6 @@ package com.taosdata.jdbc.ws.entity; ...@@ -5,7 +5,6 @@ package com.taosdata.jdbc.ws.entity;
*/ */
public enum Code { public enum Code {
SUCCESS(0, "success"), SUCCESS(0, "success"),
; ;
private final int code; private final int code;
......
package com.taosdata.jdbc.ws.entity; package com.taosdata.jdbc.ws.entity;
public class FetchBlockResp { import java.nio.ByteBuffer;
public class FetchBlockResp extends Response {
private ByteBuffer buffer;
public FetchBlockResp(long id, ByteBuffer buffer) {
this.setAction(Action.FETCH_BLOCK.getAction());
this.setReqId(id);
this.buffer = buffer;
}
public ByteBuffer getBuffer() {
return buffer;
}
public void setBuffer(ByteBuffer buffer) {
this.buffer = buffer;
}
} }
package com.taosdata.jdbc.ws.entity; package com.taosdata.jdbc.ws.entity;
import com.alibaba.fastjson.JSONArray;
public class FetchJsonResp extends Response{ public class FetchJsonResp extends Response{
private long id; private long id;
private Object[][] data; private JSONArray data;
public Object[][] getData() { public JSONArray getData() {
return data; return data;
} }
public void setData(Object[][] data) { public void setData(JSONArray data) {
this.data = data; this.data = data;
} }
......
package com.taosdata.jdbc.ws.entity;
public class FetchReq extends Payload {
private long id;
public FetchReq(long reqId, long id) {
super(reqId);
this.id = id;
}
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
}
...@@ -8,7 +8,7 @@ public class FetchResp extends Response{ ...@@ -8,7 +8,7 @@ public class FetchResp extends Response{
private String message; private String message;
private long id; private long id;
private boolean completed; private boolean completed;
private int[] lengths; private Integer[] lengths;
private int rows; private int rows;
public int getCode() { public int getCode() {
...@@ -43,11 +43,11 @@ public class FetchResp extends Response{ ...@@ -43,11 +43,11 @@ public class FetchResp extends Response{
this.completed = completed; this.completed = completed;
} }
public int[] getLengths() { public Integer[] getLengths() {
return lengths; return lengths;
} }
public void setLengths(int[] lengths) { public void setLengths(Integer[] lengths) {
this.lengths = lengths; this.lengths = lengths;
} }
......
package com.taosdata.jdbc.ws.entity;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
/**
* generate id for request
*/
public class IdUtil {
private static final Map<String, AtomicLong> ids = new HashMap<>();
public static long getId(String action) {
return ids.get(action).incrementAndGet();
}
public static void init(String action) {
ids.put(action, new AtomicLong(0));
}
}
package com.taosdata.jdbc.ws.entity;
import com.alibaba.fastjson.annotation.JSONField;
public class Payload {
@JSONField(name = "req_id")
private final long reqId;
public Payload(long reqId) {
this.reqId = reqId;
}
public long getReqId() {
return reqId;
}
}
\ No newline at end of file
package com.taosdata.jdbc.ws.entity;
public class QueryReq extends Payload {
private String sql;
public QueryReq(long reqId, String sql) {
super(reqId);
this.sql = sql;
}
public String getSql() {
return sql;
}
public void setSql(String sql) {
this.sql = sql;
}
}
package com.taosdata.jdbc.ws.entity; package com.taosdata.jdbc.ws.entity;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.annotation.JSONField;
/** /**
* send to taosadapter * send to taosadapter
...@@ -15,14 +14,14 @@ public class Request { ...@@ -15,14 +14,14 @@ public class Request {
this.args = args; this.args = args;
} }
public String id() {
return action + "_" + args.getReqId();
}
public String getAction() { public String getAction() {
return action; return action;
} }
public Long id(){
return args.getReqId();
}
public void setAction(String action) { public void setAction(String action) {
this.action = action; this.action = action;
} }
...@@ -39,118 +38,4 @@ public class Request { ...@@ -39,118 +38,4 @@ public class Request {
public String toString() { public String toString() {
return JSON.toJSONString(this); return JSON.toJSONString(this);
} }
public static Request generateConnect(String user, String password, String db) {
long reqId = IdUtil.getId(Action.CONN.getAction());
ConnectReq connectReq = new ConnectReq(reqId, user, password, db);
return new Request(Action.CONN.getAction(), connectReq);
}
public static Request generateQuery(String sql) {
long reqId = IdUtil.getId(Action.QUERY.getAction());
QueryReq queryReq = new QueryReq(reqId, sql);
return new Request(Action.QUERY.getAction(), queryReq);
}
public static Request generateFetch(long id) {
long reqId = IdUtil.getId(Action.FETCH.getAction());
FetchReq fetchReq = new FetchReq(reqId, id);
return new Request(Action.FETCH.getAction(), fetchReq);
}
public static Request generateFetchJson(long id) {
long reqId = IdUtil.getId(Action.FETCH_JSON.getAction());
FetchReq fetchReq = new FetchReq(reqId, id);
return new Request(Action.FETCH_JSON.getAction(), fetchReq);
}
public static Request generateFetchBlock(long id) {
long reqId = IdUtil.getId(Action.FETCH_BLOCK.getAction());
FetchReq fetchReq = new FetchReq(reqId, id);
return new Request(Action.FETCH_BLOCK.getAction(), fetchReq);
}
}
class Payload {
@JSONField(name = "req_id")
private final long reqId;
public Payload(long reqId) {
this.reqId = reqId;
}
public long getReqId() {
return reqId;
}
}
class ConnectReq extends Payload {
private String user;
private String password;
private String db;
public ConnectReq(long reqId, String user, String password, String db) {
super(reqId);
this.user = user;
this.password = password;
this.db = db;
}
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getDb() {
return db;
}
public void setDb(String db) {
this.db = db;
}
}
class QueryReq extends Payload {
private String sql;
public QueryReq(long reqId, String sql) {
super(reqId);
this.sql = sql;
}
public String getSql() {
return sql;
}
public void setSql(String sql) {
this.sql = sql;
}
}
class FetchReq extends Payload {
private long id;
public FetchReq(long reqId, long id) {
super(reqId);
this.id = id;
}
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
} }
\ No newline at end of file
package com.taosdata.jdbc.ws.entity;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
/**
* generate id for request
*/
public class RequestFactory {
private final Map<String, AtomicLong> ids = new HashMap<>();
public long getId(String action) {
return ids.get(action).incrementAndGet();
}
public RequestFactory() {
for (Action value : Action.values()) {
String action = value.getAction();
if (Action.CONN.getAction().equals(action) || Action.FETCH_BLOCK.getAction().equals(action))
continue;
ids.put(action, new AtomicLong(0));
}
}
public Request generateQuery(String sql) {
long reqId = this.getId(Action.QUERY.getAction());
QueryReq queryReq = new QueryReq(reqId, sql);
return new Request(Action.QUERY.getAction(), queryReq);
}
public Request generateFetch(long id) {
long reqId = this.getId(Action.FETCH.getAction());
FetchReq fetchReq = new FetchReq(reqId, id);
return new Request(Action.FETCH.getAction(), fetchReq);
}
public Request generateFetchJson(long id) {
long reqId = this.getId(Action.FETCH_JSON.getAction());
FetchReq fetchReq = new FetchReq(reqId, id);
return new Request(Action.FETCH_JSON.getAction(), fetchReq);
}
public Request generateFetchBlock(long id) {
FetchReq fetchReq = new FetchReq(id, id);
return new Request(Action.FETCH_BLOCK.getAction(), fetchReq);
}
}
...@@ -11,10 +11,6 @@ public class Response { ...@@ -11,10 +11,6 @@ public class Response {
@JSONField(name = "req_id") @JSONField(name = "req_id")
private long reqId; private long reqId;
public String id() {
return action + "_" + reqId;
}
public String getAction() { public String getAction() {
return action; return action;
} }
......
...@@ -10,15 +10,17 @@ import org.junit.runner.RunWith; ...@@ -10,15 +10,17 @@ import org.junit.runner.RunWith;
import java.sql.*; import java.sql.*;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.TimeUnit;
/** /**
* You need to start taosadapter before testing this method * You need to start taosadapter before testing this method
*/ */
@Ignore @Ignore
@RunWith(CatalogRunner.class) @RunWith(CatalogRunner.class)
@TestTarget(alias = "test connection with server", author = "huolibo",version = "2.0.37") @TestTarget(alias = "test connection with server", author = "huolibo", version = "2.0.37")
public class WSConnectionTest { public class WSConnectionTest {
private static final String host = "192.168.1.98"; // private static final String host = "192.168.1.98";
private static final String host = "127.0.0.1";
private static final int port = 6041; private static final int port = 6041;
private Connection connection; private Connection connection;
...@@ -46,13 +48,12 @@ public class WSConnectionTest { ...@@ -46,13 +48,12 @@ public class WSConnectionTest {
String url = "jdbc:TAOS-RS://" + host + ":" + port + "/"; String url = "jdbc:TAOS-RS://" + host + ":" + port + "/";
Properties properties = new Properties(); Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_USER, "root"); properties.setProperty(TSDBDriver.PROPERTY_KEY_USER, "root");
properties.setProperty(TSDBDriver.PROPERTY_KEY_PASSWORD,"taosdata"); properties.setProperty(TSDBDriver.PROPERTY_KEY_PASSWORD, "taosdata");
properties.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "true"); properties.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "true");
connection = DriverManager.getConnection(url, properties); connection = DriverManager.getConnection(url, properties);
} }
@Test @Test(expected = SQLException.class)
// @Test(expected = SQLException.class)
@Description("wrong password or user") @Description("wrong password or user")
public void wrongUserOrPasswordConection() throws SQLException { public void wrongUserOrPasswordConection() throws SQLException {
String url = "jdbc:TAOS-RS://" + host + ":" + port + "/test?user=abc&password=taosdata"; String url = "jdbc:TAOS-RS://" + host + ":" + port + "/test?user=abc&password=taosdata";
...@@ -60,4 +61,21 @@ public class WSConnectionTest { ...@@ -60,4 +61,21 @@ public class WSConnectionTest {
properties.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "true"); properties.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "true");
connection = DriverManager.getConnection(url, properties); connection = DriverManager.getConnection(url, properties);
} }
@Test
@Description("sleep keep connection")
public void keepConnection() throws SQLException, InterruptedException {
String url = "jdbc:TAOS-RS://" + host + ":" + port + "/?user=root&password=taosdata";
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "true");
connection = DriverManager.getConnection(url, properties);
TimeUnit.MINUTES.sleep(1);
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery("show databases");
TimeUnit.MINUTES.sleep(1);
resultSet.next();
System.out.println(resultSet.getTimestamp(1));
resultSet.close();
statement.close();
}
} }
package com.taosdata.jdbc.ws;
import com.taosdata.jdbc.TSDBDriver;
import com.taosdata.jdbc.annotation.CatalogRunner;
import com.taosdata.jdbc.annotation.Description;
import com.taosdata.jdbc.annotation.TestTarget;
import org.junit.*;
import org.junit.runner.RunWith;
import java.sql.*;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
@Ignore
@RunWith(CatalogRunner.class)
@TestTarget(alias = "query test", author = "huolibo", version = "2.0.38")
@FixMethodOrder
public class WSQueryTest {
private static final String host = "192.168.1.98";
private static final int port = 6041;
private static final String databaseName = "ws_query";
private static final String tableName = "wq";
private Connection connection;
private long now;
@Description("query")
@Test
public void queryBlock() throws SQLException, InterruptedException {
IntStream.range(1, 100).limit(1000).parallel().forEach(x -> {
try {
Statement statement = connection.createStatement();
statement.execute("insert into " + databaseName + "." + tableName + " values(now+100s, 100)");
ResultSet resultSet = statement.executeQuery("select * from " + databaseName + "." + tableName);
resultSet.next();
Assert.assertEquals(100, resultSet.getInt(2));
statement.close();
TimeUnit.SECONDS.sleep(10);
} catch (SQLException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
@Before
public void before() throws SQLException {
String url = "jdbc:TAOS-RS://" + host + ":" + port + "/test?user=root&password=taosdata";
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "true");
connection = DriverManager.getConnection(url, properties);
Statement statement = connection.createStatement();
statement.execute("drop database if exists " + databaseName);
statement.execute("create database " + databaseName);
statement.execute("use " + databaseName);
statement.execute("create table if not exists " + databaseName + "." + tableName + "(ts timestamp, f int)");
statement.close();
}
}
package com.taosdata.jdbc.ws;
import com.taosdata.jdbc.TSDBDriver;
import com.taosdata.jdbc.enums.TimestampFormat;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
@Ignore
public class WSSelectTest {
// private static final String host = "192.168.1.98";
private static final String host = "127.0.0.1";
private static final int port = 6041;
private static Connection connection;
private static final String databaseName = "driver";
private static void testInsert() throws SQLException {
Statement statement = connection.createStatement();
long cur = System.currentTimeMillis();
List<String> timeList = new ArrayList<>();
for (long i = 0L; i < 3000; i++) {
long t = cur + i;
timeList.add("insert into " + databaseName + ".alltype_query values(" + t + ",1,1,1,1,1,1,1,1,1,1,1,'test_binary','test_nchar')");
}
for (int i = 0; i < 3000; i++) {
statement.execute(timeList.get(i));
}
statement.close();
}
@Test
public void testWSSelect() throws SQLException {
Statement statement = connection.createStatement();
int count = 0;
long start = System.nanoTime();
for (int i = 0; i < 1000; i++) {
ResultSet resultSet = statement.executeQuery("select ts,c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13 from " + databaseName + ".alltype_query limit 3000");
while (resultSet.next()) {
count++;
resultSet.getTimestamp(1);
resultSet.getBoolean(2);
resultSet.getInt(3);
resultSet.getInt(4);
resultSet.getInt(5);
resultSet.getLong(6);
resultSet.getInt(7);
resultSet.getInt(8);
resultSet.getLong(9);
resultSet.getLong(10);
resultSet.getFloat(11);
resultSet.getDouble(12);
resultSet.getString(13);
resultSet.getString(14);
}
}
long d = System.nanoTime() - start;
System.out.println(d / 1000);
System.out.println(count);
statement.close();
}
@BeforeClass
public static void beforeClass() throws SQLException {
String url = "jdbc:TAOS-RS://" + host + ":" + port + "/?user=root&password=taosdata";
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_TIMESTAMP_FORMAT, String.valueOf(TimestampFormat.UTC));
properties.setProperty(TSDBDriver.PROPERTY_KEY_MESSAGE_WAIT_TIMEOUT, "100000");
properties.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "true");
connection = DriverManager.getConnection(url, properties);
Statement statement = connection.createStatement();
statement.execute("drop database if exists " + databaseName);
statement.execute("create database " + databaseName);
statement.execute("create table " + databaseName + ".alltype_query(ts timestamp, c1 bool,c2 tinyint, c3 smallint, c4 int, c5 bigint, c6 tinyint unsigned, c7 smallint unsigned, c8 int unsigned, c9 bigint unsigned, c10 float, c11 double, c12 binary(20), c13 nchar(30) )");
statement.close();
testInsert();
}
}
Subproject commit ecc3f50cfb868da22c716ffbcda7b6c40c9180fa Subproject commit ca4a90027ddfd5faa858a676e695ddcdd56ef2b5
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册