diff --git a/docs/en/07-develop/_sub_java.mdx b/docs/en/07-develop/_sub_java.mdx
index d14b5fd6095dd90f89dd2c2e828858585cfddff9..a928fa8836236aa08f4093c36c1d057c30018e21 100644
--- a/docs/en/07-develop/_sub_java.mdx
+++ b/docs/en/07-develop/_sub_java.mdx
@@ -1,11 +1,24 @@
-```java
-{{#include docs/examples/java/src/main/java/com/taos/example/SubscribeDemo.java}}
-{{#include docs/examples/java/src/main/java/com/taos/example/MetersDeserializer.java}}
-{{#include docs/examples/java/src/main/java/com/taos/example/Meters.java}}
-```
-```java
-{{#include docs/examples/java/src/main/java/com/taos/example/MetersDeserializer.java}}
-```
-```java
-{{#include docs/examples/java/src/main/java/com/taos/example/Meters.java}}
-```
\ No newline at end of file
+
+
+ ```java
+ {{#include docs/examples/java/src/main/java/com/taos/example/SubscribeDemo.java}}
+ ```
+ ```java
+ {{#include docs/examples/java/src/main/java/com/taos/example/MetersDeserializer.java}}
+ ```
+ ```java
+ {{#include docs/examples/java/src/main/java/com/taos/example/Meters.java}}
+ ```
+
+
+ ```java
+ {{#include docs/examples/java/src/main/java/com/taos/example/WebsocketSubscribeDemo.java}}
+ ```
+ ```java
+ {{#include docs/examples/java/src/main/java/com/taos/example/MetersDeserializer.java}}
+ ```
+ ```java
+ {{#include docs/examples/java/src/main/java/com/taos/example/Meters.java}}
+ ```
+
+
diff --git a/docs/en/14-reference/03-connector/04-java.mdx b/docs/en/14-reference/03-connector/04-java.mdx
index 61ce1660694aee4ded365cfbff0427ec48192348..36992da636a6773ede98ebca2ed6b981187afa93 100644
--- a/docs/en/14-reference/03-connector/04-java.mdx
+++ b/docs/en/14-reference/03-connector/04-java.mdx
@@ -696,6 +696,9 @@ TaosConsumer consumer = new TaosConsumer<>(config);
- enable.auto.commit: Specifies whether to commit automatically.
- group.id: consumer: Specifies the group that the consumer is in.
- value.deserializer: To deserialize the results, you can inherit `com.taosdata.jdbc.tmq.ReferenceDeserializer` and specify the result set bean. You can also inherit `com.taosdata.jdbc.tmq.Deserializer` and perform custom deserialization based on the SQL result set.
+- td.connect.type: Specifies the type connect with TDengine, `jni` or `WebSocket`. default is `jni`
+- httpConnectTimeout:WebSocket connection timeout in milliseconds, the default value is 5000 ms. It only takes effect when using WebSocket type.
+- messageWaitTimeout:socket timeout in milliseconds, the default value is 10000 ms. It only takes effect when using WebSocket type.
- For more information, see [Consumer Parameters](../../../develop/tmq).
#### Subscribe to consume data
@@ -724,6 +727,11 @@ For more information, see [Data Subscription](../../../develop/tmq).
### Usage examples
+
+
+
+In addition to the native connection, the Java Connector also supports subscribing via websocket.
+
```java
public abstract class ConsumerLoop {
private final TaosConsumer consumer;
@@ -795,6 +803,87 @@ public abstract class ConsumerLoop {
}
```
+
+
+
+```java
+public abstract class ConsumerLoop {
+ private final TaosConsumer consumer;
+ private final List topics;
+ private final AtomicBoolean shutdown;
+ private final CountDownLatch shutdownLatch;
+
+ public ConsumerLoop() throws SQLException {
+ Properties config = new Properties();
+ config.setProperty("bootstrap.servers", "localhost:6041");
+ config.setProperty("td.connect.type", "ws");
+ config.setProperty("msg.with.table.name", "true");
+ config.setProperty("enable.auto.commit", "true");
+ config.setProperty("group.id", "group2");
+ config.setProperty("value.deserializer", "com.taosdata.jdbc.tmq.ConsumerTest.ConsumerLoop$ResultDeserializer");
+
+ this.consumer = new TaosConsumer<>(config);
+ this.topics = Collections.singletonList("topic_speed");
+ this.shutdown = new AtomicBoolean(false);
+ this.shutdownLatch = new CountDownLatch(1);
+ }
+
+ public abstract void process(ResultBean result);
+
+ public void pollData() throws SQLException {
+ try {
+ consumer.subscribe(topics);
+
+ while (!shutdown.get()) {
+ ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
+ for (ResultBean record : records) {
+ process(record);
+ }
+ }
+ consumer.unsubscribe();
+ } finally {
+ consumer.close();
+ shutdownLatch.countDown();
+ }
+ }
+
+ public void shutdown() throws InterruptedException {
+ shutdown.set(true);
+ shutdownLatch.await();
+ }
+
+ public static class ResultDeserializer extends ReferenceDeserializer {
+
+ }
+
+ public static class ResultBean {
+ private Timestamp ts;
+ private int speed;
+
+ public Timestamp getTs() {
+ return ts;
+ }
+
+ public void setTs(Timestamp ts) {
+ this.ts = ts;
+ }
+
+ public int getSpeed() {
+ return speed;
+ }
+
+ public void setSpeed(int speed) {
+ this.speed = speed;
+ }
+ }
+}
+```
+
+
+
+
+> **Note**: The value of value.deserializer should be adjusted based on the package path of the test environment.
+
### Use with connection pool
#### HikariCP
@@ -878,8 +967,8 @@ The source code of the sample application is under `TDengine/examples/JDBC`:
| taos-jdbcdriver version | major changes |
| :---------------------: | :--------------------------------------------: |
-| 3.0.3 | fix timestamp resolution error for REST connection in jdk17+ version |
-| 3.0.1 - 3.0.2 | fix the resultSet data is parsed incorrectly sometimes. 3.0.1 is compiled on JDK 11, you are advised to use 3.0.2 in the JDK 8 environment |
+| 3.1.0 | JDBC REST connection supports subscription over WebSocket |
+| 3.0.1 - 3.0.4 | fix the resultSet data is parsed incorrectly sometimes. 3.0.1 is compiled on JDK 11, you are advised to use other version in the JDK 8 environment |
| 3.0.0 | Support for TDengine 3.0 |
| 2.0.42 | fix wasNull interface return value in WebSocket connection |
| 2.0.41 | fix decode method of username and password in REST connection |
diff --git a/docs/examples/java/pom.xml b/docs/examples/java/pom.xml
index 634c3f75a8386db4caab5c1d598f89dc93926c54..01d0ce936c20df1f8567869c1307606ab949cd32 100644
--- a/docs/examples/java/pom.xml
+++ b/docs/examples/java/pom.xml
@@ -1,6 +1,7 @@
-
4.0.0
@@ -17,13 +18,13 @@
-
+
com.taosdata.jdbc
taos-jdbcdriver
- 3.0.0
+ 3.1.0
-
+
junit
junit
@@ -32,4 +33,4 @@
-
+
\ No newline at end of file
diff --git a/docs/examples/java/src/main/java/com/taos/example/WebsocketSubscribeDemo.java b/docs/examples/java/src/main/java/com/taos/example/WebsocketSubscribeDemo.java
new file mode 100644
index 0000000000000000000000000000000000000000..d953a7364160a686d67b4390f3370999b02ce5d4
--- /dev/null
+++ b/docs/examples/java/src/main/java/com/taos/example/WebsocketSubscribeDemo.java
@@ -0,0 +1,79 @@
+package com.taos.example;
+
+import com.taosdata.jdbc.tmq.ConsumerRecords;
+import com.taosdata.jdbc.tmq.TMQConstants;
+import com.taosdata.jdbc.tmq.TaosConsumer;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class WebsocketSubscribeDemo {
+ private static final String TOPIC = "tmq_topic_ws";
+ private static final String DB_NAME = "meters_ws";
+ private static final AtomicBoolean shutdown = new AtomicBoolean(false);
+
+ public static void main(String[] args) {
+ Timer timer = new Timer();
+ timer.schedule(new TimerTask() {
+ public void run() {
+ shutdown.set(true);
+ }
+ }, 3_000);
+ try {
+ // prepare
+ Class.forName("com.taosdata.jdbc.rs.RestfulDriver");
+ String jdbcUrl = "jdbc:TAOS-RS://127.0.0.1:6041/?user=root&password=taosdata&batchfetch=true";
+ try (Connection connection = DriverManager.getConnection(jdbcUrl);
+ Statement statement = connection.createStatement()) {
+ statement.executeUpdate("drop topic if exists " + TOPIC);
+ statement.executeUpdate("drop database if exists " + DB_NAME);
+ statement.executeUpdate("create database " + DB_NAME);
+ statement.executeUpdate("use " + DB_NAME);
+ statement.executeUpdate(
+ "CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT) TAGS (`groupid` INT, `location` BINARY(24))");
+ statement.executeUpdate("CREATE TABLE `d0` USING `meters` TAGS(0, 'California.LosAngles')");
+ statement.executeUpdate("INSERT INTO `d0` values(now - 10s, 0.32, 116)");
+ statement.executeUpdate("INSERT INTO `d0` values(now - 8s, NULL, NULL)");
+ statement.executeUpdate(
+ "INSERT INTO `d1` USING `meters` TAGS(1, 'California.SanFrancisco') values(now - 9s, 10.1, 119)");
+ statement.executeUpdate(
+ "INSERT INTO `d1` values (now-8s, 10, 120) (now - 6s, 10, 119) (now - 4s, 11.2, 118)");
+ // create topic
+ statement.executeUpdate("create topic " + TOPIC + " as select * from meters");
+ }
+
+ // create consumer
+ Properties properties = new Properties();
+ properties.setProperty(TMQConstants.BOOTSTRAP_SERVERS, "127.0.0.1:6041");
+ properties.setProperty(TMQConstants.CONNECT_TYPE, "ws");
+ properties.setProperty(TMQConstants.MSG_WITH_TABLE_NAME, "true");
+ properties.setProperty(TMQConstants.ENABLE_AUTO_COMMIT, "true");
+ properties.setProperty(TMQConstants.GROUP_ID, "test");
+ properties.setProperty(TMQConstants.VALUE_DESERIALIZER,
+ "com.taos.example.MetersDeserializer");
+
+ // poll data
+ try (TaosConsumer consumer = new TaosConsumer<>(properties)) {
+ consumer.subscribe(Collections.singletonList(TOPIC));
+ while (!shutdown.get()) {
+ ConsumerRecords meters = consumer.poll(Duration.ofMillis(100));
+ for (Meters meter : meters) {
+ System.out.println(meter);
+ }
+ }
+ consumer.unsubscribe();
+ }
+ } catch (ClassNotFoundException | SQLException e) {
+ e.printStackTrace();
+ }
+ timer.cancel();
+ }
+}
diff --git a/docs/examples/java/src/test/java/com/taos/test/TestAll.java b/docs/examples/java/src/test/java/com/taos/test/TestAll.java
index 8d201da0745e1d2d36220c9d78383fc37d4a813a..f24156d8b13fc8f0dfb6eac52b8b68815e8e3483 100644
--- a/docs/examples/java/src/test/java/com/taos/test/TestAll.java
+++ b/docs/examples/java/src/test/java/com/taos/test/TestAll.java
@@ -64,21 +64,15 @@ public class TestAll {
@Test
public void testSubscribe() {
-
- Thread thread = new Thread(() -> {
- try {
- Thread.sleep(1000);
- insertData();
- } catch (SQLException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- });
- thread.start();
SubscribeDemo.main(args);
}
+
+ @Test
+ public void testSubscribeOverWebsocket() {
+ WebsocketSubscribeDemo.main(args);
+ }
+
@Test
public void testSchemaless() throws SQLException {
LineProtocolExample.main(args);
diff --git a/docs/zh/07-develop/_sub_java.mdx b/docs/zh/07-develop/_sub_java.mdx
index e7de158cc8d2b0b686b25bbe96e7a092c2a68e51..e5e46b2af0db52f4d6494a225c585b2a7b0a9a4b 100644
--- a/docs/zh/07-develop/_sub_java.mdx
+++ b/docs/zh/07-develop/_sub_java.mdx
@@ -1,3 +1,5 @@
+
+
```java
{{#include docs/examples/java/src/main/java/com/taos/example/SubscribeDemo.java}}
```
@@ -6,4 +8,17 @@
```
```java
{{#include docs/examples/java/src/main/java/com/taos/example/Meters.java}}
-```
\ No newline at end of file
+```
+
+
+```java
+{{#include docs/examples/java/src/main/java/com/taos/example/WebsocketSubscribeDemo.java}}
+```
+```java
+{{#include docs/examples/java/src/main/java/com/taos/example/MetersDeserializer.java}}
+```
+```java
+{{#include docs/examples/java/src/main/java/com/taos/example/Meters.java}}
+```
+
+
diff --git a/docs/zh/08-connector/14-java.mdx b/docs/zh/08-connector/14-java.mdx
index fc6dc571383b8b003ee6e8c8dce73fa7645978e0..061475f51e0cf9c25ba09d670e9ceef0bd2bab71 100644
--- a/docs/zh/08-connector/14-java.mdx
+++ b/docs/zh/08-connector/14-java.mdx
@@ -699,7 +699,10 @@ TaosConsumer consumer = new TaosConsumer<>(config);
- enable.auto.commit: 是否允许自动提交。
- group.id: consumer: 所在的 group。
- value.deserializer: 结果集反序列化方法,可以继承 `com.taosdata.jdbc.tmq.ReferenceDeserializer`,并指定结果集 bean,实现反序列化。也可以继承 `com.taosdata.jdbc.tmq.Deserializer`,根据 SQL 的 resultSet 自定义反序列化方式。
-- 其他参数请参考:[Consumer 参数列表](../../../develop/tmq#创建-consumer-以及consumer-group)
+- td.connect.type: 连接方式。jni:表示使用动态库连接的方式,ws/WebSocket:表示使用 WebSocket 进行数据通信。默认为 jni 方式。
+- httpConnectTimeout:创建连接超时参数,单位 ms,默认为 5000 ms。仅在 WebSocket 连接下有效。
+- messageWaitTimeout:数据传输超时参数,单位 ms,默认为 10000 ms。仅在 WebSocket 连接下有效。
+其他参数请参考:[Consumer 参数列表](../../../develop/tmq#创建-consumer-以及consumer-group)
#### 订阅消费数据
@@ -727,6 +730,9 @@ consumer.close()
### 使用示例如下:
+
+
+
```java
public abstract class ConsumerLoop {
private final TaosConsumer consumer;
@@ -798,6 +804,89 @@ public abstract class ConsumerLoop {
}
```
+
+
+
+除了原生的连接方式,Java 连接器还支持通过 WebSocket 订阅数据。
+
+```java
+public abstract class ConsumerLoop {
+ private final TaosConsumer consumer;
+ private final List topics;
+ private final AtomicBoolean shutdown;
+ private final CountDownLatch shutdownLatch;
+
+ public ConsumerLoop() throws SQLException {
+ Properties config = new Properties();
+ config.setProperty("bootstrap.servers", "localhost:6041");
+ config.setProperty("td.connect.type", "ws");
+ config.setProperty("msg.with.table.name", "true");
+ config.setProperty("enable.auto.commit", "true");
+ config.setProperty("group.id", "group2");
+ config.setProperty("value.deserializer", "com.taosdata.jdbc.tmq.ConsumerTest.ConsumerLoop$ResultDeserializer");
+
+ this.consumer = new TaosConsumer<>(config);
+ this.topics = Collections.singletonList("topic_speed");
+ this.shutdown = new AtomicBoolean(false);
+ this.shutdownLatch = new CountDownLatch(1);
+ }
+
+ public abstract void process(ResultBean result);
+
+ public void pollData() throws SQLException {
+ try {
+ consumer.subscribe(topics);
+
+ while (!shutdown.get()) {
+ ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
+ for (ResultBean record : records) {
+ process(record);
+ }
+ }
+ consumer.unsubscribe();
+ } finally {
+ consumer.close();
+ shutdownLatch.countDown();
+ }
+ }
+
+ public void shutdown() throws InterruptedException {
+ shutdown.set(true);
+ shutdownLatch.await();
+ }
+
+ public static class ResultDeserializer extends ReferenceDeserializer {
+
+ }
+
+ public static class ResultBean {
+ private Timestamp ts;
+ private int speed;
+
+ public Timestamp getTs() {
+ return ts;
+ }
+
+ public void setTs(Timestamp ts) {
+ this.ts = ts;
+ }
+
+ public int getSpeed() {
+ return speed;
+ }
+
+ public void setSpeed(int speed) {
+ this.speed = speed;
+ }
+ }
+}
+```
+
+
+
+
+> **注意**:这里的 value.deserializer 配置参数值应该根据测试环境的包路径做相应的调整。
+
### 与连接池使用
#### HikariCP
@@ -881,8 +970,8 @@ public static void main(String[] args) throws Exception {
| taos-jdbcdriver 版本 | 主要变化 |
| :------------------: | :----------------------------: |
-| 3.0.3 | 修复 REST 连接在 jdk17+ 版本时间戳解析错误问题 |
-| 3.0.1 - 3.0.2 | 修复一些情况下结果集数据解析错误的问题。3.0.1 在 JDK 11 环境编译,JDK 8 环境下建议使用 3.0.2 版本 |
+| 3.1.0 | WebSocket 连接支持订阅功能 |
+| 3.0.1 - 3.0.4 | 修复一些情况下结果集数据解析错误的问题。3.0.1 在 JDK 11 环境编译,JDK 8 环境下建议使用其他版本 |
| 3.0.0 | 支持 TDengine 3.0 |
| 2.0.42 | 修在 WebSocket 连接中 wasNull 接口返回值 |
| 2.0.41 | 修正 REST 连接中用户名和密码转码方式 |