diff --git a/src/client/src/TSDBJNIConnector.c b/src/client/src/TSDBJNIConnector.c
index cec4737226fb6b2bb5966b63a1a5e72f0a98f71b..de39e62420f0cdd0e6218b49e594067394a3260c 100644
--- a/src/client/src/TSDBJNIConnector.c
+++ b/src/client/src/TSDBJNIConnector.c
@@ -344,7 +344,7 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getResultSetImp(
STscObj *pObj = pSql->pTscObj;
if (tscIsUpdateQuery(pSql)) {
- taos_free_result(pSql); // free result here
+ // taos_free_result(pSql); // free result here
jniTrace("jobj:%p, conn:%p, no resultset, %p", jobj, pObj, (void *)tres);
return 0;
} else {
diff --git a/src/connector/jdbc/pom.xml b/src/connector/jdbc/pom.xml
index 36f1a1010c372b319fab7fceaf13c9c44689dc6b..0f441d097ebfa46279680a69470f63c5ae93e901 100755
--- a/src/connector/jdbc/pom.xml
+++ b/src/connector/jdbc/pom.xml
@@ -62,6 +62,12 @@
commons-lang3
${commons-lang3.version}
+
+ junit
+ junit
+ 4.8.2
+ test
+
@@ -98,6 +104,14 @@
true
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ 2.12.4
+
+ true
+
+
diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java
index 8bb7084604851c695961564ed2f8c0142accb7fb..46158210b1292b89db84c5da1fb6a84d9adee748 100755
--- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java
+++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java
@@ -51,8 +51,6 @@ public class TSDBJNIConnector {
/**
* Returns the status of last result set in current connection
- *
- * @return
*/
public boolean isResultsetClosed() {
return this.isResultsetClosed;
@@ -112,7 +110,7 @@ public class TSDBJNIConnector {
*
* @throws SQLException
*/
- public int executeQuery(String sql) throws SQLException {
+ public long executeQuery(String sql) throws SQLException {
if (!this.isResultsetClosed) {
freeResultSet(taosResultSetPointer);
}
@@ -127,7 +125,6 @@ public class TSDBJNIConnector {
}
int code = this.getErrCode(pSql);
- affectedRows = code;
if (code < 0) {
affectedRows = -1;
if (code == TSDBConstants.JNI_TDENGINE_ERROR) {
@@ -146,7 +143,7 @@ public class TSDBJNIConnector {
if (taosResultSetPointer != TSDBConstants.JNI_NULL_POINTER) {
isResultsetClosed = false;
}
- return code;
+ return pSql;
}
private native long executeQueryImp(byte[] sqlBytes, long connection);
@@ -199,8 +196,6 @@ public class TSDBJNIConnector {
/**
* Close the open result set which is associated to the current connection. If the result set is already
* closed, return 0 for success.
- *
- * @return
*/
public int freeResultSet() {
int resCode = TSDBConstants.JNI_SUCCESS;
@@ -217,7 +212,7 @@ public class TSDBJNIConnector {
/**
* Get affected rows count
*/
- public int getAffectedRows(Long pSql) {
+ public int getAffectedRows(long pSql) {
int affectedRows = this.affectedRows;
if (affectedRows < 0) {
affectedRows = this.getAffectedRowsImp(this.taos, pSql);
@@ -225,7 +220,7 @@ public class TSDBJNIConnector {
return affectedRows;
}
- private native int getAffectedRowsImp(long connection, Long pSql);
+ private native int getAffectedRowsImp(long connection, long pSql);
/**
* Get schema metadata
diff --git a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/TSDBAsyncSubscribeTest.java b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/TSDBAsyncSubscribeTest.java
new file mode 100644
index 0000000000000000000000000000000000000000..530b47d2773133e76fc267b31fedbabbb0dff5c6
--- /dev/null
+++ b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/TSDBAsyncSubscribeTest.java
@@ -0,0 +1,96 @@
+package com.taosdata.jdbc;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Properties;
+
+import static org.junit.Assert.assertTrue;
+
+public class TSDBAsyncSubscribeTest {
+ Connection connection = null;
+ Statement statement = null;
+ String dbName = "test";
+ String tName = "t0";
+ String host = "localhost";
+ String topic = "test";
+ long subscribId = 0;
+
+// @Before
+ public void createDatabase() throws SQLException {
+ try {
+ Class.forName("com.taosdata.jdbc.TSDBDriver");
+ } catch (ClassNotFoundException e) {
+ return;
+ }
+ Properties properties = new Properties();
+ properties.setProperty(TSDBDriver.PROPERTY_KEY_HOST, host);
+ connection = DriverManager.getConnection("jdbc:TAOS://" + host + ":0/" + "?user=root&password=taosdata"
+ , properties);
+
+ statement = connection.createStatement();
+ statement.executeUpdate("create database if not exists " + dbName);
+ statement.executeUpdate("create table if not exists " + dbName + "." + tName + " (ts timestamp, k int, v int)");
+ long ts = System.currentTimeMillis();
+ for (int i = 0; i < 5; i++) {
+ ts += i;
+ statement.executeUpdate("insert into \" + dbName + \".\" + tName + \" values (" + ts + ", " + (100 + i) + ", " + i + ")");
+ }
+ }
+
+// @Test
+ public void subscribe() throws Exception {
+ TSDBSubscribe subscribe = null;
+ try {
+ String rawSql = "select * from " + dbName + "." + tName + ";";
+ System.out.println(rawSql);
+ subscribe = ((TSDBConnection) connection).createSubscribe();
+ subscribId = subscribe.subscribe(topic, rawSql, false, 1000, new CallBack("first"));
+
+ assertTrue(subscribId > 0);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ Thread.sleep(2000);
+ subscribe.unsubscribe(subscribId, true);
+ }
+
+ private static class CallBack implements TSDBSubscribeCallBack {
+ private String name = "";
+
+ public CallBack(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public void invoke(TSDBResultSet resultSet) {
+ System.out.println("resultSet");
+ try {
+ while (null != resultSet && resultSet.next()) {
+ System.out.print("callback_" + name + ": ");
+ for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) {
+ System.out.printf(i + ": " + resultSet.getString(i) + "\t");
+ }
+ System.out.println();
+ }
+ resultSet.close();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+// @After
+ public void close() throws Exception {
+ statement.executeQuery("drop database test");
+ statement.close();
+ connection.close();
+ }
+}
\ No newline at end of file
diff --git a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/TSDBImportTest.java b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/TSDBImportTest.java
new file mode 100644
index 0000000000000000000000000000000000000000..1fd15b2dfdd0a8c6e21f17c8e5c37b1bab0fc867
--- /dev/null
+++ b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/TSDBImportTest.java
@@ -0,0 +1,77 @@
+package com.taosdata.jdbc;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+public class TSDBImportTest {
+ Connection connection = null;
+ Statement statement = null;
+ String dbName = "test";
+ String tName = "t0";
+ String host = "localhost";
+
+ @Before
+ public void createDatabase() throws SQLException {
+ try {
+ Class.forName("com.taosdata.jdbc.TSDBDriver");
+ } catch (ClassNotFoundException e) {
+ return;
+ }
+ Properties properties = new Properties();
+ properties.setProperty(TSDBDriver.PROPERTY_KEY_HOST, host);
+ connection = DriverManager.getConnection("jdbc:TAOS://" + host + ":0/" + "?user=root&password=taosdata"
+ , properties);
+
+ statement = connection.createStatement();
+ statement.executeUpdate("drop database if exists " + dbName);
+ statement.executeUpdate("create database if not exists " + dbName);
+ statement.executeUpdate("create table if not exists " + dbName + "." + tName + " (ts timestamp, k int, v int)");
+
+ }
+
+ @Test
+ public void insertInto() throws Exception {
+ long ts = System.currentTimeMillis();
+
+ for (int i = 0; i < 50; i++) {
+ ts += i;
+ int row = statement.executeUpdate("insert into " + dbName + "." + tName + " values (" + ts + ", " + (100 + i) + ", " + i + ")");
+ System.out.println("insert into " + dbName + "." + tName + " values (" + ts + ", " + (100 + i) + ", " + i + ")" + "\t" + row);
+ assertEquals(1, row);
+ }
+ }
+
+ @Test
+ public void importInto() throws Exception {
+ // 避免时间重复
+ long ts = System.currentTimeMillis() + 1000;
+
+ StringBuilder sqlBuilder = new StringBuilder("insert into ").append(dbName).append(".").append(tName).append(" values ");
+
+ for (int i = 0; i < 50; i++) {
+ int a = i / 5;
+ long t = ts + a;
+ sqlBuilder.append("(").append(t).append(",").append((100 + i)).append(",").append(i).append(") ");
+ }
+ System.out.println(sqlBuilder.toString());
+ int rows = statement.executeUpdate(sqlBuilder.toString());
+ System.out.println(rows);
+ assertEquals(10, rows);
+ }
+
+ @After
+ public void close() throws Exception {
+ statement.executeQuery("drop database " + dbName);
+ statement.close();
+ connection.close();
+ }
+}
diff --git a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/TSDBSubscribeTest.java b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/TSDBSubscribeTest.java
new file mode 100644
index 0000000000000000000000000000000000000000..b7c631358769667afb10a4ab5d9dc06ab6c5f6cb
--- /dev/null
+++ b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/TSDBSubscribeTest.java
@@ -0,0 +1,90 @@
+package com.taosdata.jdbc;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Properties;
+
+import static org.junit.Assert.assertTrue;
+
+public class TSDBSubscribeTest {
+ Connection connection = null;
+ Statement statement = null;
+ String dbName = "test";
+ String tName = "t0";
+ String host = "localhost";
+ String topic = "test";
+
+// @Before
+ public void createDatabase() throws SQLException {
+ try {
+ Class.forName("com.taosdata.jdbc.TSDBDriver");
+ } catch (ClassNotFoundException e) {
+ return;
+ }
+ Properties properties = new Properties();
+ properties.setProperty(TSDBDriver.PROPERTY_KEY_HOST, host);
+ connection = DriverManager.getConnection("jdbc:TAOS://" + host + ":0/" + "?user=root&password=taosdata"
+ , properties);
+
+ statement = connection.createStatement();
+ statement.executeUpdate("create database if not exists " + dbName);
+ statement.executeUpdate("create table if not exists " + dbName + "." + tName + " (ts timestamp, k int, v int)");
+ long ts = System.currentTimeMillis();
+ for (int i = 0; i < 5; i++) {
+ ts += i;
+ statement.executeUpdate("insert into \" + dbName + \".\" + tName + \" values (" + ts + ", " + (100 + i) + ", " + i + ")");
+ }
+ }
+
+// @Test
+ public void subscribe() throws Exception {
+ TSDBSubscribe subscribe = null;
+ long subscribId = 0;
+ try {
+
+ String rawSql = "select * from " + dbName + "." + tName + ";";
+ System.out.println(rawSql);
+ subscribe = ((TSDBConnection) connection).createSubscribe();
+ subscribId = subscribe.subscribe(topic, rawSql, false, 1000);
+
+ assertTrue(subscribId > 0);
+
+ int a = 0;
+ while (true) {
+ Thread.sleep(900);
+ TSDBResultSet resSet = subscribe.consume(subscribId);
+
+ while (resSet.next()) {
+ for (int i = 1; i <= resSet.getMetaData().getColumnCount(); i++) {
+ System.out.printf(i + ": " + resSet.getString(i) + "\t");
+ }
+ System.out.println("\n======" + a + "==========");
+ }
+ resSet.close();
+ a++;
+ if (a >= 3) {
+ break;
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ if (null != subscribe && 0 != subscribId) {
+ subscribe.unsubscribe(subscribId, true);
+ }
+ }
+ }
+
+// @After
+ public void close() throws Exception {
+ statement.executeQuery("drop database " + dbName);
+ statement.close();
+ connection.close();
+ }
+}
\ No newline at end of file