diff --git a/documentation20/webdocs/markdowndocs/connector-ch.md b/documentation20/webdocs/markdowndocs/connector-ch.md index ae39c78101fc7eacdfa28813f6fef462dbd17ee6..499209286499709f2367099943691b5e3f25407f 100644 --- a/documentation20/webdocs/markdowndocs/connector-ch.md +++ b/documentation20/webdocs/markdowndocs/connector-ch.md @@ -460,6 +460,49 @@ while(resultSet.next()){ > 查询和操作关系型数据库一致,使用下标获取返回字段内容时从 1 开始,建议使用字段名称获取。 +### 订阅 + +#### 创建 + +```java +TSDBSubscribe sub = ((TSDBConnection)conn).subscribe("topic", "select * from meters", false); +``` + +`subscribe` 方法的三个参数含义如下: + +* topic:订阅的主题(即名称),此参数是订阅的唯一标识 +* sql:订阅的查询语句,此语句只能是 `select` 语句,只应查询原始数据,只能按时间正序查询数据 +* restart:如果订阅已经存在,是重新开始,还是继续之前的订阅 + +如上面的例子将使用 SQL 语句 `select * from meters` 创建一个名为 `topic' 的订阅,如果这个订阅已经存在,将继续之前的查询进度,而不是从头开始消费所有的数据。 + +#### 消费数据 + +```java +int total = 0; +while(true) { + TSDBResultSet rs = sub.consume(); + int count = 0; + while(rs.next()) { + count++; + } + total += count; + System.out.printf("%d rows consumed, total %d\n", count, total); + Thread.sleep(1000); +} +``` + +`consume` 方法返回一个结果集,其中包含从上次 `consume` 到目前为止的所有新数据。请务必按需选择合理的调用 `consume` 的频率(如例子中的`Thread.sleep(1000)`),否则会给服务端造成不必要的压力。 + +#### 关闭订阅 + +```java +sub.close(true); +``` + +`close` 方法关闭一个订阅。如果其参数为 `true` 表示保留订阅进度信息,后续可以创建同名订阅继续消费数据;如为 `false` 则不保留订阅进度。 + + ### 关闭资源 ```java diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBConnection.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBConnection.java index 062cb63cfd508800d2135494a8d45b6682a16fc1..2ce39b7ee4b5c9a390a4331378abb57d6899d1cf 100644 --- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBConnection.java +++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBConnection.java @@ -84,12 +84,17 @@ public class TSDBConnection implements Connection { } } - public TSDBSubscribe createSubscribe() throws SQLException { - if (!this.connector.isClosed()) { - return new TSDBSubscribe(this.connector); - } else { + public TSDBSubscribe subscribe(String topic, String sql, boolean restart) throws SQLException { + if (this.connector.isClosed()) { throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL)); } + + long id = this.connector.subscribe(topic, sql, restart, 0); + if (id == 0) { + throw new SQLException(TSDBConstants.WrapErrMsg("failed to create subscription")); + } + + return new TSDBSubscribe(this.connector, id); } public PreparedStatement prepareStatement(String sql) throws SQLException { diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java index 13fa2eda81922692a87e98265bec542e88f56c86..bab3c79089b0d4cb746fdaf6e5e76d5174731e46 100755 --- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java +++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java @@ -254,29 +254,29 @@ public class TSDBJNIConnector { private native int closeConnectionImp(long connection); /** - * Subscribe to a table in TSDB + * Create a subscription */ - public long subscribe(String topic, String sql, boolean restart, int period) { + long subscribe(String topic, String sql, boolean restart, int period) { return subscribeImp(this.taos, restart, topic, sql, period); } - public native long subscribeImp(long connection, boolean restart, String topic, String sql, int period); + private native long subscribeImp(long connection, boolean restart, String topic, String sql, int period); /** - * Consume a subscribed table + * Consume a subscription */ - public long consume(long subscription) { - return this.consumeImp(subscription); + long consume(long subscription) { + return this.consumeImp(subscription); } private native long consumeImp(long subscription); /** - * Unsubscribe a table + * Unsubscribe, close a subscription * * @param subscription */ - public void unsubscribe(long subscription, boolean isKeep) { + void unsubscribe(long subscription, boolean isKeep) { unsubscribeImp(subscription, isKeep); } diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBSubscribe.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBSubscribe.java index e20c6a815c190cfdc8bf01e7b0e1afe676c3790f..deffd9aa2ae88802f71af5cbec66c5896cf4e19a 100644 --- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBSubscribe.java +++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBSubscribe.java @@ -22,81 +22,28 @@ import java.util.concurrent.*; public class TSDBSubscribe { private TSDBJNIConnector connecter = null; - private static ScheduledExecutorService pool; - private static Map timerTaskMap = new ConcurrentHashMap<>(); - private static Map scheduledMap = new ConcurrentHashMap(); + private long id = 0; - private static class TimerInstance { - private static final ScheduledExecutorService instance = Executors.newScheduledThreadPool(1); - } - - public static ScheduledExecutorService getTimerInstance() { - return TimerInstance.instance; - } - - public TSDBSubscribe(TSDBJNIConnector connecter) throws SQLException { + TSDBSubscribe(TSDBJNIConnector connecter, long id) throws SQLException { if (null != connecter) { this.connecter = connecter; + this.id = id; } else { throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL)); } } /** - * sync subscribe - * - * @param topic - * @param sql - * @param restart - * @param period - * @throws SQLException - */ - public long subscribe(String topic, String sql, boolean restart, int period) throws SQLException { - if (this.connecter.isClosed()) { - throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL)); - } - if (period < 1000) { - throw new SQLException(TSDBConstants.WrapErrMsg(TSDBConstants.INVALID_VARIABLES)); - } - return this.connecter.subscribe(topic, sql, restart, period); - } - - /** - * async subscribe + * consume * - * @param topic - * @param sql - * @param restart - * @param period - * @param callBack - * @throws SQLException + * @throws OperationsException, SQLException */ - public long subscribe(String topic, String sql, boolean restart, int period, TSDBSubscribeCallBack callBack) throws SQLException { + public TSDBResultSet consume() throws OperationsException, SQLException { if (this.connecter.isClosed()) { throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL)); } - final long subscription = this.connecter.subscribe(topic, sql, restart, period); - if (null != callBack) { - pool = getTimerInstance(); - TSDBTimerTask timerTask = new TSDBTimerTask(subscription, callBack); - - timerTaskMap.put(subscription, timerTask); - - ScheduledFuture scheduledFuture = pool.scheduleAtFixedRate(timerTask, 1, 1000, TimeUnit.MILLISECONDS); - scheduledMap.put(subscription, scheduledFuture); - } - return subscription; - } - - public TSDBResultSet consume(long subscription) throws OperationsException, SQLException { - if (this.connecter.isClosed()) { - throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL)); - } - if (0 == subscription) { - throw new OperationsException("Invalid use of consume"); - } - long resultSetPointer = this.connecter.consume(subscription); + long resultSetPointer = this.connecter.consume(this.id); if (resultSetPointer == TSDBConstants.JNI_CONNECTION_NULL) { throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL)); @@ -108,77 +55,16 @@ public class TSDBSubscribe { } /** - * cancel subscribe + * close subscription * - * @param subscription - * @param isKeep + * @param keepProgress * @throws SQLException */ - public void unsubscribe(long subscription, boolean isKeep) throws SQLException { + public void close(boolean keepProgress) throws SQLException { if (this.connecter.isClosed()) { throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL)); } - - if (null != timerTaskMap.get(subscription)) { - synchronized (timerTaskMap.get(subscription)) { - while (1 == timerTaskMap.get(subscription).getState()) { - try { - Thread.sleep(10); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - timerTaskMap.get(subscription).setState(2); - if (!timerTaskMap.isEmpty() && timerTaskMap.containsKey(subscription)) { - timerTaskMap.get(subscription).cancel(); - timerTaskMap.remove(subscription); - scheduledMap.get(subscription).cancel(false); - scheduledMap.remove(subscription); - } - this.connecter.unsubscribe(subscription, isKeep); - } - } else { - this.connecter.unsubscribe(subscription, isKeep); - } - } - - class TSDBTimerTask extends TimerTask { - private long subscription; - private TSDBSubscribeCallBack callBack; - // 0: not running 1: running 2: cancel - private int state = 0; - - public TSDBTimerTask(long subscription, TSDBSubscribeCallBack callBack) { - this.subscription = subscription; - this.callBack = callBack; - } - - public int getState() { - return this.state; - } - - public void setState(int state) { - this.state = state; - } - - @Override - public void run() { - synchronized (this) { - if (2 == state) { - return; - } - - state = 1; - - try { - callBack.invoke(consume(subscription)); - } catch (Exception e) { - this.cancel(); - throw new RuntimeException(e); - } - state = 0; - } - } + this.connecter.unsubscribe(this.id, keepProgress); } } diff --git a/src/connector/jdbc/src/test/java/TestAsyncTSDBSubscribe.java b/src/connector/jdbc/src/test/java/TestAsyncTSDBSubscribe.java deleted file mode 100644 index 6d4c6b1e94b5bb5c507c1cce2b5c87b31ae36648..0000000000000000000000000000000000000000 --- a/src/connector/jdbc/src/test/java/TestAsyncTSDBSubscribe.java +++ /dev/null @@ -1,92 +0,0 @@ -import com.taosdata.jdbc.*; -import org.apache.commons.lang3.StringUtils; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.util.Properties; - -public class TestAsyncTSDBSubscribe { - public static void main(String[] args) throws SQLException { - String usage = "java -cp taos-jdbcdriver-2.0.0_dev-dist.jar com.taosdata.jdbc.TSDBSubscribe -db dbName -topic topicName " + - "-tname tableName -h host"; - if (args.length < 2) { - System.err.println(usage); - return; - } - - String dbName = ""; - String tName = ""; - String host = "localhost"; - String topic = ""; - for (int i = 0; i < args.length; i++) { - if ("-db".equalsIgnoreCase(args[i]) && i < args.length - 1) { - dbName = args[++i]; - } - if ("-tname".equalsIgnoreCase(args[i]) && i < args.length - 1) { - tName = args[++i]; - } - if ("-h".equalsIgnoreCase(args[i]) && i < args.length - 1) { - host = args[++i]; - } - if ("-topic".equalsIgnoreCase(args[i]) && i < args.length - 1) { - topic = args[++i]; - } - } - if (StringUtils.isEmpty(dbName) || StringUtils.isEmpty(tName) || StringUtils.isEmpty(topic)) { - System.err.println(usage); - return; - } - - Connection connection = null; - long subscribId = 0; - try { - Class.forName("com.taosdata.jdbc.TSDBDriver"); - - Properties properties = new Properties(); - properties.setProperty(TSDBDriver.PROPERTY_KEY_HOST, host); - properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8"); - properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8"); - properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8"); - properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8"); - - connection = DriverManager.getConnection("jdbc:TAOS://" + host + ":0/" + dbName + "?user=root&password=taosdata", properties); - String rawSql = "select * from " + tName + ";"; - TSDBSubscribe subscribe = ((TSDBConnection) connection).createSubscribe(); - subscribId = subscribe.subscribe(topic, rawSql, false, 1000, new CallBack("first")); - long subscribId2 = subscribe.subscribe("test", rawSql, false, 1000, new CallBack("second")); - int a = 0; - Thread.sleep(2000); - subscribe.unsubscribe(subscribId, true); - System.err.println("cancel subscribe"); - } catch (Exception e) { - e.printStackTrace(); - if (null != connection && !connection.isClosed()) { - connection.close(); - } - } - } - - private static class CallBack implements TSDBSubscribeCallBack { - private String name = ""; - - public CallBack(String name) { - this.name = name; - } - - @Override - public void invoke(TSDBResultSet resultSet) { - try { - while (null !=resultSet && resultSet.next()) { - System.out.print("callback_" + name + ": "); - for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) { - System.out.printf(i + ": " + resultSet.getString(i) + "\t"); - } - System.out.println(); - } - } catch (Exception e) { - e.printStackTrace(); - } - } - } -} diff --git a/src/connector/jdbc/src/test/java/TestTSDBSubscribe.java b/src/connector/jdbc/src/test/java/TestTSDBSubscribe.java index 598ef4bbc02cd4beb7ba18bb1794a572fae16b13..df730efa69f5937b938f2556d39517a0aa15b3bb 100644 --- a/src/connector/jdbc/src/test/java/TestTSDBSubscribe.java +++ b/src/connector/jdbc/src/test/java/TestTSDBSubscribe.java @@ -2,82 +2,76 @@ import com.taosdata.jdbc.TSDBConnection; import com.taosdata.jdbc.TSDBDriver; import com.taosdata.jdbc.TSDBResultSet; import com.taosdata.jdbc.TSDBSubscribe; -import org.apache.commons.lang3.StringUtils; import java.sql.Connection; import java.sql.DriverManager; import java.util.Properties; public class TestTSDBSubscribe { + + public static TSDBConnection connectTDengine(String host, String database) throws Exception { + Class.forName("com.taosdata.jdbc.TSDBDriver"); + Properties properties = new Properties(); + properties.setProperty(TSDBDriver.PROPERTY_KEY_HOST, host); + properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8"); + properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8"); + properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8"); + + String cs = String.format("jdbc:TAOS://%s:0/%s?user=root&password=taosdata", host, database); + return (TSDBConnection)DriverManager.getConnection(cs, properties); + } + + + public static void main(String[] args) throws Exception { - String usage = "java -cp taos-jdbcdriver-2.0.0_dev-dist.jar com.taosdata.jdbc.TSDBSubscribe -db dbName " + - "-topic topicName -tname tableName -h host"; + String usage = "java -Djava.ext.dirs=../ TestTSDBSubscribe [-host host] <-db database> <-topic topic> <-sql sql>"; if (args.length < 2) { System.err.println(usage); return; } - String dbName = ""; - String tName = ""; - String host = "localhost"; - String topic = ""; + String host = "localhost", database = "", topic = "", sql = ""; for (int i = 0; i < args.length; i++) { if ("-db".equalsIgnoreCase(args[i]) && i < args.length - 1) { - dbName = args[++i]; + database = args[++i]; } - if ("-tname".equalsIgnoreCase(args[i]) && i < args.length - 1) { - tName = args[++i]; + if ("-topic".equalsIgnoreCase(args[i]) && i < args.length - 1) { + topic = args[++i]; } - if ("-h".equalsIgnoreCase(args[i]) && i < args.length - 1) { + if ("-host".equalsIgnoreCase(args[i]) && i < args.length - 1) { host = args[++i]; } - if ("-topic".equalsIgnoreCase(args[i]) && i < args.length - 1) { - topic = args[++i]; + if ("-sql".equalsIgnoreCase(args[i]) && i < args.length - 1) { + sql = args[++i]; } } - if (StringUtils.isEmpty(dbName) || StringUtils.isEmpty(tName) || StringUtils.isEmpty(topic)) { - System.err.println(usage); - return; + if (database.isEmpty() || topic.isEmpty() || sql.isEmpty()) { + System.err.println(usage); + return; } - Connection connection = null; - TSDBSubscribe subscribe = null; - long subscribId = 0; + TSDBConnection connection = null; + TSDBSubscribe sub = null; try { - Class.forName("com.taosdata.jdbc.TSDBDriver"); - Properties properties = new Properties(); - properties.setProperty(TSDBDriver.PROPERTY_KEY_HOST, host); - properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8"); - properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8"); - properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8"); - connection = DriverManager.getConnection("jdbc:TAOS://" + host + ":0/" + dbName + "?user=root&password=taosdata" - , properties); - String rawSql = "select * from " + tName + ";"; - subscribe = ((TSDBConnection) connection).createSubscribe(); - subscribId = subscribe.subscribe(topic, rawSql, false, 1000); - int a = 0; - TSDBResultSet resSet = null; - while (true) { - Thread.sleep(900); - resSet = subscribe.consume(subscribId); + connection = connectTDengine(host, database); + sub = ((TSDBConnection) connection).subscribe(topic, sql, false); - while (resSet.next()) { - for (int i = 1; i <= resSet.getMetaData().getColumnCount(); i++) { - System.out.printf(i + ": " + resSet.getString(i) + "\t"); - } - System.out.println("\n======" + a + "=========="); - } - - a++; - if (a >= 10) { - break; + int total = 0; + while(true) { + TSDBResultSet rs = sub.consume(); + int count = 0; + while(rs.next()) { + count++; } + total += count; + System.out.printf("%d rows consumed, total %d\n", count, total); + Thread.sleep(900); } } catch (Exception e) { e.printStackTrace(); } finally { - if (null != subscribe && 0 != subscribId) { - subscribe.unsubscribe(subscribId, true); + if (null != sub) { + sub.close(true); } if (null != connection) { connection.close(); diff --git a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/AsyncSubscribeTest.java b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/AsyncSubscribeTest.java deleted file mode 100644 index c14624e683c213851e123cc9e1671aa26ceb27be..0000000000000000000000000000000000000000 --- a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/AsyncSubscribeTest.java +++ /dev/null @@ -1,100 +0,0 @@ -package com.taosdata.jdbc; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.Properties; - -import static org.junit.Assert.assertTrue; - -public class AsyncSubscribeTest extends BaseTest { - Connection connection = null; - Statement statement = null; - String dbName = "test"; - String tName = "t0"; - String host = "localhost"; - String topic = "test"; - long subscribId = 0; - - @Before - public void createDatabase() throws SQLException { - try { - Class.forName("com.taosdata.jdbc.TSDBDriver"); - } catch (ClassNotFoundException e) { - return; - } - Properties properties = new Properties(); - properties.setProperty(TSDBDriver.PROPERTY_KEY_HOST, host); - properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8"); - properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8"); - properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8"); - connection = DriverManager.getConnection("jdbc:TAOS://" + host + ":0/" + "?user=root&password=taosdata" - , properties); - - statement = connection.createStatement(); - statement.executeUpdate("create database if not exists " + dbName); - statement.executeUpdate("create table if not exists " + dbName + "." + tName + " (ts timestamp, k int, v int)"); - long ts = System.currentTimeMillis(); - for (int i = 0; i < 2; i++) { - ts += i; - String sql = "insert into " + dbName + "." + tName + " values (" + ts + ", " + (100 + i) + ", " + i + ")"; - statement.executeUpdate(sql); - } - } - - @Test - public void subscribe() throws Exception { - TSDBSubscribe subscribe = null; - try { - String rawSql = "select * from " + dbName + "." + tName + ";"; - System.out.println(rawSql); - subscribe = ((TSDBConnection) connection).createSubscribe(); - subscribId = subscribe.subscribe(topic, rawSql, false, 1000, new CallBack("first")); - - assertTrue(subscribId > 0); - } catch (Exception e) { - e.printStackTrace(); - } - - Thread.sleep(2000); - subscribe.unsubscribe(subscribId, true); - } - - private static class CallBack implements TSDBSubscribeCallBack { - private String name = ""; - - public CallBack(String name) { - this.name = name; - } - - @Override - public void invoke(TSDBResultSet resultSet) { - try { - while (null != resultSet && resultSet.next()) { - System.out.print("callback_" + name + ": "); - for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) { - System.out.printf(i + ": " + resultSet.getString(i) + "\t"); - } - System.out.println(); - } - resultSet.close(); - - } catch (Exception e) { - e.printStackTrace(); - } - } - } - - @After - public void close() throws Exception { - statement.executeQuery("drop database test"); - statement.close(); - connection.close(); - Thread.sleep(10); - } -} \ No newline at end of file diff --git a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/SubscribeTest.java b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/SubscribeTest.java index d7f56ac4683cf0b9d983fa058a9871d2f116cdb2..2dc27adab72f1664b951c658209809a258b3a730 100644 --- a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/SubscribeTest.java +++ b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/SubscribeTest.java @@ -49,20 +49,16 @@ public class SubscribeTest extends BaseTest { @Test public void subscribe() throws Exception { TSDBSubscribe subscribe = null; - long subscribId = 0; try { String rawSql = "select * from " + dbName + "." + tName + ";"; System.out.println(rawSql); - subscribe = ((TSDBConnection) connection).createSubscribe(); - subscribId = subscribe.subscribe(topic, rawSql, false, 1000); - - assertTrue(subscribId > 0); + subscribe = ((TSDBConnection) connection).subscribe(topic, rawSql, false); int a = 0; while (true) { Thread.sleep(900); - TSDBResultSet resSet = subscribe.consume(subscribId); + TSDBResultSet resSet = subscribe.consume(); while (resSet.next()) { for (int i = 1; i <= resSet.getMetaData().getColumnCount(); i++) { @@ -79,8 +75,8 @@ public class SubscribeTest extends BaseTest { } catch (Exception e) { e.printStackTrace(); } finally { - if (null != subscribe && 0 != subscribId) { - subscribe.unsubscribe(subscribId, true); + if (null != subscribe) { + subscribe.close(true); } } }