diff --git a/src/client/jni/com_taosdata_jdbc_TSDBJNIConnector.h b/src/client/jni/com_taosdata_jdbc_TSDBJNIConnector.h index 8dbe63d75a73dd18a15bc1da8f99c7b8db774eea..d976b2be3902e6f3dd014781ea1e32b42cd84cc3 100644 --- a/src/client/jni/com_taosdata_jdbc_TSDBJNIConnector.h +++ b/src/client/jni/com_taosdata_jdbc_TSDBJNIConnector.h @@ -142,8 +142,8 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_subscribeImp * Method: consumeImp * Signature: (J)Lcom/taosdata/jdbc/TSDBResultSetRowData; */ -JNIEXPORT jobject JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp - (JNIEnv *, jobject, jlong, jint); +JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp + (JNIEnv *, jobject, jlong); /* * Class: com_taosdata_jdbc_TSDBJNIConnector diff --git a/src/client/src/TSDBJNIConnector.c b/src/client/src/TSDBJNIConnector.c index 228403c79d318d922f5571a9663b3c97bbffbbc8..27b28c8f33a6ce770f17e7a2f881ecb8ee77bd27 100644 --- a/src/client/src/TSDBJNIConnector.c +++ b/src/client/src/TSDBJNIConnector.c @@ -606,7 +606,7 @@ static jobject convert_one_row(JNIEnv *env, TAOS_ROW row, TAOS_FIELD* fields, in return rowobj; } -JNIEXPORT jobject JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp(JNIEnv *env, jobject jobj, jlong sub, jint timeout) { +JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp(JNIEnv *env, jobject jobj, jlong sub) { jniTrace("jobj:%p, in TSDBJNIConnector_consumeImp, sub:%ld", jobj, sub); jniGetGlobalMethod(env); @@ -616,38 +616,14 @@ JNIEXPORT jobject JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp(JNI int64_t start = taosGetTimestampMs(); int count = 0; - while (true) { - TAOS_RES * res = taos_consume(tsub); - if (res == NULL) { - jniError("jobj:%p, tsub:%p, taos_consume returns NULL", jobj, tsub); - return NULL; - } - - TAOS_FIELD *fields = taos_fetch_fields(res); - int num_fields = taos_num_fields(res); - while (true) { - TAOS_ROW row = taos_fetch_row(res); - if (row == NULL) { - break; - } - jobject rowobj = convert_one_row(env, row, fields, num_fields); - (*env)->CallBooleanMethod(env, rows, g_arrayListAddFp, rowobj); - count++; - } + TAOS_RES *res = taos_consume(tsub); - if (count > 0) { - break; - } - if (timeout == -1) { - continue; - } - if (((int)(taosGetTimestampMs() - start)) >= timeout) { - jniTrace("jobj:%p, sub:%ld, timeout", jobj, sub); - break; - } + if (res == NULL) { + jniError("jobj:%p, tsub:%p, taos_consume returns NULL", jobj, tsub); + return NULL; } - return rows; + return res; } JNIEXPORT void JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_unsubscribeImp(JNIEnv *env, jobject jobj, jlong sub, jboolean keepProgress) { diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBConnection.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBConnection.java index 4640f6b446b6d285806408856489c5e950b37a81..062cb63cfd508800d2135494a8d45b6682a16fc1 100644 --- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBConnection.java +++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBConnection.java @@ -84,6 +84,14 @@ public class TSDBConnection implements Connection { } } + public TSDBSubscribe createSubscribe() throws SQLException { + if (!this.connector.isClosed()) { + return new TSDBSubscribe(this.connector); + } else { + throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL)); + } + } + public PreparedStatement prepareStatement(String sql) throws SQLException { if (!this.connector.isClosed()) { return new TSDBPreparedStatement(this.connector, sql); diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java index 3adb601822567a6a7c515fa405801024e99a4609..ed428ea15faa0ca5ab38f93f9392c3fc9fa8f31d 100755 --- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java +++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java @@ -22,8 +22,8 @@ public class TSDBJNIConnector { static volatile Boolean isInitialized = false; static { - System.loadLibrary("taos"); System.out.println("java.library.path:" + System.getProperty("java.library.path")); + System.loadLibrary("taos"); } /** @@ -261,31 +261,31 @@ public class TSDBJNIConnector { /** * Subscribe to a table in TSDB */ - public long subscribe(String host, String user, String password, String database, String table, long time, int period) { - return subscribeImp(host, user, password, database, table, time, period); + public long subscribe(String topic, String sql, boolean restart, int period) { + return subscribeImp(this.taos, restart, topic, sql, period); } - private native long subscribeImp(String host, String user, String password, String database, String table, long time, int period); + public native long subscribeImp(long connection, boolean restart, String topic, String sql, int period); /** * Consume a subscribed table */ - public TSDBResultSetRowData consume(long subscription) { + public long consume(long subscription) { return this.consumeImp(subscription); } - private native TSDBResultSetRowData consumeImp(long subscription); + private native long consumeImp(long subscription); /** * Unsubscribe a table * * @param subscription */ - public void unsubscribe(long subscription) { - unsubscribeImp(subscription); + public void unsubscribe(long subscription, boolean isKeep) { + unsubscribeImp(subscription, isKeep); } - private native void unsubscribeImp(long subscription); + private native void unsubscribeImp(long subscription, boolean isKeep); /** * Validate if a create table sql statement is correct without actually creating that table @@ -293,7 +293,7 @@ public class TSDBJNIConnector { public boolean validateCreateTableSql(String sql) { long connection = taos; int res = validateCreateTableSqlImp(connection, sql.getBytes()); - return res != 0 ? false : true; + return res == 0; } private native int validateCreateTableSqlImp(long connection, byte[] sqlBytes); diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBSubscribe.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBSubscribe.java new file mode 100644 index 0000000000000000000000000000000000000000..ef83ed3a751f88e2dc248689d1552e12c54da7cb --- /dev/null +++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBSubscribe.java @@ -0,0 +1,181 @@ +/*************************************************************************** + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + *****************************************************************************/ +package com.taosdata.jdbc; + +import javax.management.OperationsException; +import java.sql.SQLException; +import java.util.Map; +import java.util.TimerTask; +import java.util.concurrent.*; + +public class TSDBSubscribe { + private TSDBJNIConnector connecter = null; + private static ScheduledExecutorService pool; + private static Map timerTaskMap = new ConcurrentHashMap<>(); + private static Map scheduledMap = new ConcurrentHashMap(); + + private static class TimerInstance { + private static final ScheduledExecutorService instance = Executors.newScheduledThreadPool(1); + } + + public static ScheduledExecutorService getTimerInstance() { + return TimerInstance.instance; + } + + public TSDBSubscribe(TSDBJNIConnector connecter) throws SQLException { + if (null != connecter) { + this.connecter = connecter; + } else { + throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL)); + } + } + + /** + * sync subscribe + * + * @param topic + * @param sql + * @param restart + * @param period + * @throws SQLException + */ + public long subscribe(String topic, String sql, boolean restart, int period) throws SQLException { + if (this.connecter.isClosed()) { + throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL)); + } + if (period < 1000) { + throw new SQLException(TSDBConstants.WrapErrMsg(TSDBConstants.INVALID_VARIABLES)); + } + return this.connecter.subscribe(topic, sql, restart, period); + } + + /** + * async subscribe + * + * @param topic + * @param sql + * @param restart + * @param period + * @param callBack + * @throws SQLException + */ + public long subscribe(String topic, String sql, boolean restart, int period, TSDBSubscribeCallBack callBack) throws SQLException { + if (this.connecter.isClosed()) { + throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL)); + } + final long subscription = this.connecter.subscribe(topic, sql, restart, period); + if (null != callBack) { + pool = getTimerInstance(); + + TSDBTimerTask timerTask = new TSDBTimerTask(subscription, callBack); + + timerTaskMap.put(subscription, timerTask); + + ScheduledFuture scheduledFuture = pool.scheduleAtFixedRate(timerTask, 1, 1000, TimeUnit.MILLISECONDS); + scheduledMap.put(subscription, scheduledFuture); + } + return subscription; + } + + public TSDBResultSet consume(long subscription) throws OperationsException, SQLException { + if (this.connecter.isClosed()) { + throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL)); + } + if (0 == subscription) { + throw new OperationsException("Invalid use of consume"); + } + long resultSetPointer = this.connecter.consume(subscription); + + if (resultSetPointer == TSDBConstants.JNI_CONNECTION_NULL) { + throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL)); + } else if (resultSetPointer == TSDBConstants.JNI_NULL_POINTER) { + return null; + } else { + return new TSDBResultSet(this.connecter, resultSetPointer); + } + } + + /** + * cancel subscribe + * + * @param subscription + * @param isKeep + * @throws SQLException + */ + public void unsubscribe(long subscription, boolean isKeep) throws SQLException { + if (this.connecter.isClosed()) { + throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL)); + } + + synchronized (timerTaskMap.get(subscription)) { + while (1 == timerTaskMap.get(subscription).getState()) { + try { + Thread.sleep(10); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + timerTaskMap.get(subscription).setState(2); + if (!timerTaskMap.isEmpty() && timerTaskMap.containsKey(subscription)) { + timerTaskMap.get(subscription).cancel(); + timerTaskMap.remove(subscription); + scheduledMap.get(subscription).cancel(false); + scheduledMap.remove(subscription); + } + this.connecter.unsubscribe(subscription, isKeep); + } + } + + class TSDBTimerTask extends TimerTask { + private long subscription; + private TSDBSubscribeCallBack callBack; + // 0: not running 1: running 2: cancel + private int state = 0; + + public TSDBTimerTask(long subscription, TSDBSubscribeCallBack callBack) { + this.subscription = subscription; + this.callBack = callBack; + } + + public int getState() { + return this.state; + } + + public void setState(int state) { + this.state = state; + } + + @Override + public void run() { + synchronized (this) { + if (2 == state) { + return; + } + + state = 1; + + try { + TSDBResultSet resultSet = consume(subscription); + callBack.invoke(resultSet); + } catch (Exception e) { + this.cancel(); + throw new RuntimeException(e); + } + state = 0; + } + } + } +} + diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBSubscribeCallBack.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBSubscribeCallBack.java new file mode 100644 index 0000000000000000000000000000000000000000..c1b9b02fa85f8e2197f41d75f8f0c25b3f4c416c --- /dev/null +++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBSubscribeCallBack.java @@ -0,0 +1,19 @@ +/*************************************************************************** + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + *****************************************************************************/ +package com.taosdata.jdbc; + +public interface TSDBSubscribeCallBack { + void invoke(TSDBResultSet resultSet); +} diff --git a/src/connector/jdbc/src/test/java/TestAsyncTSDBSubscribe.java b/src/connector/jdbc/src/test/java/TestAsyncTSDBSubscribe.java new file mode 100644 index 0000000000000000000000000000000000000000..5b2b6367ec5fe7cb46f0514b59931d8942f9bc74 --- /dev/null +++ b/src/connector/jdbc/src/test/java/TestAsyncTSDBSubscribe.java @@ -0,0 +1,83 @@ +import com.taosdata.jdbc.*; +import org.apache.commons.lang3.StringUtils; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.util.Properties; + +public class TestAsyncTSDBSubscribe { + public static void main(String[] args) { + String usage = "java -cp taos-jdbcdriver-1.0.3_dev-dist.jar com.taosdata.jdbc.TSDBSubscribe -db dbName -topic topicName " + + "-tname tableName -h host"; + if (args.length < 2) { + System.err.println(usage); + return; + } + + String dbName = ""; + String tName = ""; + String host = "localhost"; + String topic = ""; + for (int i = 0; i < args.length; i++) { + if ("-db".equalsIgnoreCase(args[i]) && i < args.length - 1) { + dbName = args[++i]; + } + if ("-tname".equalsIgnoreCase(args[i]) && i < args.length - 1) { + tName = args[++i]; + } + if ("-h".equalsIgnoreCase(args[i]) && i < args.length - 1) { + host = args[++i]; + } + if ("-topic".equalsIgnoreCase(args[i]) && i < args.length - 1) { + topic = args[++i]; + } + } + if (StringUtils.isEmpty(dbName) || StringUtils.isEmpty(tName) || StringUtils.isEmpty(topic)) { + System.err.println(usage); + return; + } + + Connection connection = null; + TSDBSubscribe subscribe = null; + long subscribId = 0; + try { + Class.forName("com.taosdata.jdbc.TSDBDriver"); + Properties properties = new Properties(); + properties.setProperty(TSDBDriver.PROPERTY_KEY_HOST, host); + connection = DriverManager.getConnection("jdbc:TAOS://" + host + ":0/" + dbName + "?user=root&password=taosdata", properties); + String rawSql = "select * from " + tName + ";"; + subscribe = ((TSDBConnection) connection).createSubscribe(); + subscribId = subscribe.subscribe(topic, rawSql, false, 1000, new CallBack("first")); + long subscribId2 = subscribe.subscribe("test", rawSql, false, 1000, new CallBack("second")); + int a = 0; + Thread.sleep(2000); + subscribe.unsubscribe(subscribId, true); + System.err.println("cancel subscribe"); + } catch (Exception e) { + e.printStackTrace(); + } + } + + private static class CallBack implements TSDBSubscribeCallBack { + private String name = ""; + + public CallBack(String name) { + this.name = name; + } + + @Override + public void invoke(TSDBResultSet resultSet) { + try { + while (null !=resultSet && resultSet.next()) { + System.out.print("callback_" + name + ": "); + for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) { + System.out.printf(i + ": " + resultSet.getString(i) + "\t"); + } + System.out.println(); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + } +} diff --git a/src/connector/jdbc/src/test/java/TestPreparedStatement.java b/src/connector/jdbc/src/test/java/TestPreparedStatement.java index 2e2cc0ede6ae88c9b43c4890894378f13a3cc14a..fce2bd1428eead684a7524226c5f16d3e4cc7881 100644 --- a/src/connector/jdbc/src/test/java/TestPreparedStatement.java +++ b/src/connector/jdbc/src/test/java/TestPreparedStatement.java @@ -11,8 +11,14 @@ public class TestPreparedStatement { Class.forName("com.taosdata.jdbc.TSDBDriver"); Properties properties = new Properties(); properties.setProperty(TSDBDriver.PROPERTY_KEY_HOST, "192.168.1.117"); - Connection connection = DriverManager.getConnection("jdbc:TAOS://192.168.1.117:0/?user=root&password=taosdata", properties); + Connection connection = DriverManager.getConnection("jdbc:TAOS://10.211.55.3:0/log?user=root&password=taosdata", properties); + String createSql = "create table t (ts timestamp, speed int);"; + Statement statement = connection.createStatement(); + statement.executeQuery(createSql); String rawSql = "SELECT ts, c1 FROM (select c1, ts from db.tb1) SUB_QRY"; + if (1 < 2) { + return; + } // String[] params = new String[]{"ts", "c1"}; PreparedStatement pstmt = (TSDBPreparedStatement) connection.prepareStatement(rawSql); ResultSet resSet = pstmt.executeQuery(); diff --git a/src/connector/jdbc/src/test/java/TestTSDBResultSetRowData.java b/src/connector/jdbc/src/test/java/TestTSDBResultSetRowData.java new file mode 100644 index 0000000000000000000000000000000000000000..62d4c2ff6118b19fd7559bf8b23e2e3529339c18 --- /dev/null +++ b/src/connector/jdbc/src/test/java/TestTSDBResultSetRowData.java @@ -0,0 +1,35 @@ +import com.taosdata.jdbc.ColumnMetaData; +import com.taosdata.jdbc.DatabaseMetaDataResultSet; +import com.taosdata.jdbc.TSDBResultSetRowData; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; + +public class TestTSDBResultSetRowData { + public static void main(String[] args) throws SQLException { + DatabaseMetaDataResultSet resultSet = new DatabaseMetaDataResultSet(); + List columnMetaDataList = new ArrayList(1); + ColumnMetaData colMetaData = new ColumnMetaData(); + colMetaData.setColIndex(0); + colMetaData.setColName("TABLE_TYPE"); + colMetaData.setColSize(10); + colMetaData.setColType(8); + columnMetaDataList.add(colMetaData); + + List rowDataList = new ArrayList(2); + TSDBResultSetRowData rowData = new TSDBResultSetRowData(2); + rowData.setString(0, "TABLE"); + rowDataList.add(rowData); + rowData = new TSDBResultSetRowData(2); + rowData.setString(0, "STABLE"); + rowDataList.add(rowData); + + resultSet.setColumnMetaDataList(columnMetaDataList); + resultSet.setRowDataList(rowDataList); + + while (resultSet.next()) { + System.out.println(resultSet.getString(1)); + } + } +} diff --git a/src/connector/jdbc/src/test/java/TestTSDBSubscribe.java b/src/connector/jdbc/src/test/java/TestTSDBSubscribe.java new file mode 100644 index 0000000000000000000000000000000000000000..f12924c8a61a3dbeeec56f7d2c712472d771e8e7 --- /dev/null +++ b/src/connector/jdbc/src/test/java/TestTSDBSubscribe.java @@ -0,0 +1,83 @@ +import com.taosdata.jdbc.TSDBConnection; +import com.taosdata.jdbc.TSDBDriver; +import com.taosdata.jdbc.TSDBResultSet; +import com.taosdata.jdbc.TSDBSubscribe; +import org.apache.commons.lang3.StringUtils; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.util.Properties; + +public class TestTSDBSubscribe { + public static void main(String[] args) throws Exception { + String usage = "java -cp taos-jdbcdriver-1.0.3_dev-dist.jar com.taosdata.jdbc.TSDBSubscribe -db dbName " + + "-topic topicName -tname tableName -h host"; + if (args.length < 2) { + System.err.println(usage); + return; + } + + String dbName = ""; + String tName = ""; + String host = "localhost"; + String topic = ""; + for (int i = 0; i < args.length; i++) { + if ("-db".equalsIgnoreCase(args[i]) && i < args.length - 1) { + dbName = args[++i]; + } + if ("-tname".equalsIgnoreCase(args[i]) && i < args.length - 1) { + tName = args[++i]; + } + if ("-h".equalsIgnoreCase(args[i]) && i < args.length - 1) { + host = args[++i]; + } + if ("-topic".equalsIgnoreCase(args[i]) && i < args.length - 1) { + topic = args[++i]; + } + } + if (StringUtils.isEmpty(dbName) || StringUtils.isEmpty(tName) || StringUtils.isEmpty(topic)) { + System.err.println(usage); + return; + } + + Connection connection = null; + TSDBSubscribe subscribe = null; + long subscribId = 0; + try { + Class.forName("com.taosdata.jdbc.TSDBDriver"); + Properties properties = new Properties(); + properties.setProperty(TSDBDriver.PROPERTY_KEY_HOST, host); + connection = DriverManager.getConnection("jdbc:TAOS://" + host + ":0/" + dbName + "?user=root&password=taosdata" + , properties); + String rawSql = "select * from " + tName + ";"; + subscribe = ((TSDBConnection) connection).createSubscribe(); + subscribId = subscribe.subscribe(topic, rawSql, false, 1000); + int a = 0; + while (true) { + Thread.sleep(900); + TSDBResultSet resSet = subscribe.consume(subscribId); + + while (resSet.next()) { + for (int i = 1; i <= resSet.getMetaData().getColumnCount(); i++) { + System.out.printf(i + ": " + resSet.getString(i) + "\t"); + } + System.out.println("\n======" + a + "=========="); + } + + a++; + if (a >= 10) { + break; + } + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + if (null != subscribe && 0 != subscribId) { + subscribe.unsubscribe(subscribId, true); + } + if (null != connection) { + connection.close(); + } + } + } +}