From 7e02d5ef4b074cb6f26b53b757821c38ce291c73 Mon Sep 17 00:00:00 2001 From: Shuaiqiang Chang Date: Tue, 2 Jun 2020 11:54:30 +0800 Subject: [PATCH] feat:taos-tools subscribe --- .../jni/com_taosdata_jdbc_TSDBJNIConnector.h | 2 +- src/client/src/TSDBJNIConnector.c | 99 +-------- .../jdbc/DatabaseMetaDataResultSet.java | 9 + .../com/taosdata/jdbc/TSDBConnection.java | 8 + .../com/taosdata/jdbc/TSDBJNIConnector.java | 16 +- .../java/com/taosdata/jdbc/TSDBResultSet.java | 180 ++++++++--------- .../java/com/taosdata/jdbc/TSDBSubscribe.java | 185 +++++++++++++++++ .../taosdata/jdbc/TSDBSubscribeCallBack.java | 19 ++ .../src/test/java/TestAsyncTSDBSubscribe.java | 83 ++++++++ .../src/test/java/TestPreparedStatement.java | 6 +- .../jdbc/src/test/java/TestTSDBSubscribe.java | 83 ++++++++ src/kit/taosnetwork/client.c | 128 ++++++++++++ src/kit/taosnetwork/server.c | 190 ++++++++++++++++++ 13 files changed, 813 insertions(+), 195 deletions(-) create mode 100644 src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBSubscribe.java create mode 100644 src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBSubscribeCallBack.java create mode 100644 src/connector/jdbc/src/test/java/TestAsyncTSDBSubscribe.java create mode 100644 src/connector/jdbc/src/test/java/TestTSDBSubscribe.java create mode 100644 src/kit/taosnetwork/client.c create mode 100644 src/kit/taosnetwork/server.c diff --git a/src/client/jni/com_taosdata_jdbc_TSDBJNIConnector.h b/src/client/jni/com_taosdata_jdbc_TSDBJNIConnector.h index 8dbe63d75a..713ab2111d 100644 --- a/src/client/jni/com_taosdata_jdbc_TSDBJNIConnector.h +++ b/src/client/jni/com_taosdata_jdbc_TSDBJNIConnector.h @@ -143,7 +143,7 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_subscribeImp * Signature: (J)Lcom/taosdata/jdbc/TSDBResultSetRowData; */ JNIEXPORT jobject JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp - (JNIEnv *, jobject, jlong, jint); + (JNIEnv *, jobject, jlong); /* * Class: com_taosdata_jdbc_TSDBJNIConnector diff --git a/src/client/src/TSDBJNIConnector.c b/src/client/src/TSDBJNIConnector.c index 6ab1b73d1e..98a19184ed 100644 --- a/src/client/src/TSDBJNIConnector.c +++ b/src/client/src/TSDBJNIConnector.c @@ -552,107 +552,20 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_subscribeImp(JNI return sub; } -static jobject convert_one_row(JNIEnv *env, TAOS_ROW row, TAOS_FIELD* fields, int num_fields) { - jobject rowobj = (*env)->NewObject(env, g_rowdataClass, g_rowdataConstructor, num_fields); - jniTrace("created a rowdata object, rowobj:%p", rowobj); - - for (int i = 0; i < num_fields; i++) { - if (row[i] == NULL) { - continue; - } - - switch (fields[i].type) { - case TSDB_DATA_TYPE_BOOL: - (*env)->CallVoidMethod(env, rowobj, g_rowdataSetBooleanFp, i, (jboolean)(*((char *)row[i]) == 1)); - break; - case TSDB_DATA_TYPE_TINYINT: - (*env)->CallVoidMethod(env, rowobj, g_rowdataSetByteFp, i, (jbyte) * ((char *)row[i])); - break; - case TSDB_DATA_TYPE_SMALLINT: - (*env)->CallVoidMethod(env, rowobj, g_rowdataSetShortFp, i, (jshort) * ((short *)row[i])); - break; - case TSDB_DATA_TYPE_INT: - (*env)->CallVoidMethod(env, rowobj, g_rowdataSetIntFp, i, (jint) * (int *)row[i]); - break; - case TSDB_DATA_TYPE_BIGINT: - (*env)->CallVoidMethod(env, rowobj, g_rowdataSetLongFp, i, (jlong) * ((int64_t *)row[i])); - break; - case TSDB_DATA_TYPE_FLOAT: { - float fv = 0; - fv = GET_FLOAT_VAL(row[i]); - (*env)->CallVoidMethod(env, rowobj, g_rowdataSetFloatFp, i, (jfloat)fv); - } - break; - case TSDB_DATA_TYPE_DOUBLE:{ - double dv = 0; - dv = GET_DOUBLE_VAL(row[i]); - (*env)->CallVoidMethod(env, rowobj, g_rowdataSetDoubleFp, i, (jdouble)dv); - } - break; - case TSDB_DATA_TYPE_BINARY: { - char tmp[TSDB_MAX_BYTES_PER_ROW] = {0}; - strncpy(tmp, row[i], (size_t) fields[i].bytes); // handle the case that terminated does not exist - (*env)->CallVoidMethod(env, rowobj, g_rowdataSetStringFp, i, (*env)->NewStringUTF(env, tmp)); - - memset(tmp, 0, (size_t) fields[i].bytes); - break; - } - case TSDB_DATA_TYPE_NCHAR: - (*env)->CallVoidMethod(env, rowobj, g_rowdataSetByteArrayFp, i, - jniFromNCharToByteArray(env, (char*)row[i], fields[i].bytes)); - break; - case TSDB_DATA_TYPE_TIMESTAMP: - (*env)->CallVoidMethod(env, rowobj, g_rowdataSetTimestampFp, i, (jlong) * ((int64_t *)row[i])); - break; - default: - break; - } - } - return rowobj; -} - -JNIEXPORT jobject JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp(JNIEnv *env, jobject jobj, jlong sub, jint timeout) { +JNIEXPORT jobject JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp(JNIEnv *env, jobject jobj, jlong sub) { jniTrace("jobj:%p, in TSDBJNIConnector_consumeImp, sub:%ld", jobj, sub); jniGetGlobalMethod(env); TAOS_SUB *tsub = (TAOS_SUB *)sub; - jobject rows = (*env)->NewObject(env, g_arrayListClass, g_arrayListConstructFp); - - int64_t start = taosGetTimestampMs(); - int count = 0; - while (true) { - TAOS_RES * res = taos_consume(tsub); - if (res == NULL) { - jniError("jobj:%p, tsub:%p, taos_consume returns NULL", jobj, tsub); - return NULL; - } - - TAOS_FIELD *fields = taos_fetch_fields(res); - int num_fields = taos_num_fields(res); - while (true) { - TAOS_ROW row = taos_fetch_row(res); - if (row == NULL) { - break; - } - jobject rowobj = convert_one_row(env, row, fields, num_fields); - (*env)->CallBooleanMethod(env, rows, g_arrayListAddFp, rowobj); - count++; - } + TAOS_RES *res = taos_consume(tsub); - if (count > 0) { - break; - } - if (timeout == -1) { - continue; - } - if (((int)(taosGetTimestampMs() - start)) >= timeout) { - jniTrace("jobj:%p, sub:%ld, timeout", jobj, sub); - break; - } + if (res == NULL) { + jniError("jobj:%p, tsub:%p, taos_consume returns NULL", jobj, tsub); + return 0l; } - return rows; + return res; } JNIEXPORT void JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_unsubscribeImp(JNIEnv *env, jobject jobj, jlong sub, jboolean keepProgress) { diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/DatabaseMetaDataResultSet.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/DatabaseMetaDataResultSet.java index 027d2197a3..9ee66472a3 100644 --- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/DatabaseMetaDataResultSet.java +++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/DatabaseMetaDataResultSet.java @@ -102,41 +102,49 @@ public class DatabaseMetaDataResultSet implements ResultSet { @Override public byte getByte(int columnIndex) throws SQLException { + columnIndex--; return (byte) rowCursor.getInt(columnIndex, columnMetaDataList.get(columnIndex).getColType()); } @Override public short getShort(int columnIndex) throws SQLException { + columnIndex--; return (short) rowCursor.getInt(columnIndex, columnMetaDataList.get(columnIndex).getColType()); } @Override public int getInt(int columnIndex) throws SQLException { + columnIndex--; return rowCursor.getInt(columnIndex, columnMetaDataList.get(columnIndex).getColType()); } @Override public long getLong(int columnIndex) throws SQLException { + columnIndex--; return rowCursor.getLong(columnIndex, columnMetaDataList.get(columnIndex).getColType()); } @Override public float getFloat(int columnIndex) throws SQLException { + columnIndex--; return rowCursor.getFloat(columnIndex, columnMetaDataList.get(columnIndex).getColType()); } @Override public double getDouble(int columnIndex) throws SQLException { + columnIndex--; return rowCursor.getDouble(columnIndex, columnMetaDataList.get(columnIndex).getColType()); } @Override public BigDecimal getBigDecimal(int columnIndex, int scale) throws SQLException { + columnIndex--; return new BigDecimal(rowCursor.getDouble(columnIndex, columnMetaDataList.get(columnIndex).getColType())); } @Override public byte[] getBytes(int columnIndex) throws SQLException { + columnIndex--; return (rowCursor.getString(columnIndex, columnMetaDataList.get(columnIndex).getColType())).getBytes(); } @@ -152,6 +160,7 @@ public class DatabaseMetaDataResultSet implements ResultSet { @Override public Timestamp getTimestamp(int columnIndex) throws SQLException { + columnIndex--; return rowCursor.getTimestamp(columnIndex); } diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBConnection.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBConnection.java index 4640f6b446..062cb63cfd 100644 --- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBConnection.java +++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBConnection.java @@ -84,6 +84,14 @@ public class TSDBConnection implements Connection { } } + public TSDBSubscribe createSubscribe() throws SQLException { + if (!this.connector.isClosed()) { + return new TSDBSubscribe(this.connector); + } else { + throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL)); + } + } + public PreparedStatement prepareStatement(String sql) throws SQLException { if (!this.connector.isClosed()) { return new TSDBPreparedStatement(this.connector, sql); diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java index 3adb601822..d7f73a3fca 100755 --- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java +++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java @@ -261,31 +261,31 @@ public class TSDBJNIConnector { /** * Subscribe to a table in TSDB */ - public long subscribe(String host, String user, String password, String database, String table, long time, int period) { - return subscribeImp(host, user, password, database, table, time, period); + public long subscribe(String topic, String sql, boolean restart, int period) { + return subscribeImp(this.taos, restart, topic, sql, period); } - private native long subscribeImp(String host, String user, String password, String database, String table, long time, int period); + public native long subscribeImp(long connection, boolean restart, String topic, String sql, int period); /** * Consume a subscribed table */ - public TSDBResultSetRowData consume(long subscription) { + public long consume(long subscription) { return this.consumeImp(subscription); } - private native TSDBResultSetRowData consumeImp(long subscription); + private native long consumeImp(long subscription); /** * Unsubscribe a table * * @param subscription */ - public void unsubscribe(long subscription) { - unsubscribeImp(subscription); + public void unsubscribe(long subscription, boolean isKeep) { + unsubscribeImp(subscription, isKeep); } - private native void unsubscribeImp(long subscription); + private native void unsubscribeImp(long subscription, boolean isKeep); /** * Validate if a create table sql statement is correct without actually creating that table diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBResultSet.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBResultSet.java index 8acf779756..961633b8ae 100644 --- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBResultSet.java +++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBResultSet.java @@ -51,47 +51,47 @@ public class TSDBResultSet implements ResultSet { private boolean lastWasNull = false; private final int COLUMN_INDEX_START_VALUE = 1; - public TSDBJNIConnector getJniConnector() { - return jniConnector; - } + public TSDBJNIConnector getJniConnector() { + return jniConnector; + } - public void setJniConnector(TSDBJNIConnector jniConnector) { - this.jniConnector = jniConnector; - } + public void setJniConnector(TSDBJNIConnector jniConnector) { + this.jniConnector = jniConnector; + } - public long getResultSetPointer() { - return resultSetPointer; - } + public long getResultSetPointer() { + return resultSetPointer; + } - public void setResultSetPointer(long resultSetPointer) { - this.resultSetPointer = resultSetPointer; - } + public void setResultSetPointer(long resultSetPointer) { + this.resultSetPointer = resultSetPointer; + } - public List getColumnMetaDataList() { - return columnMetaDataList; - } + public List getColumnMetaDataList() { + return columnMetaDataList; + } - public void setColumnMetaDataList(List columnMetaDataList) { - this.columnMetaDataList = columnMetaDataList; - } + public void setColumnMetaDataList(List columnMetaDataList) { + this.columnMetaDataList = columnMetaDataList; + } - public TSDBResultSetRowData getRowData() { - return rowData; - } + public TSDBResultSetRowData getRowData() { + return rowData; + } - public void setRowData(TSDBResultSetRowData rowData) { - this.rowData = rowData; - } + public void setRowData(TSDBResultSetRowData rowData) { + this.rowData = rowData; + } - public boolean isLastWasNull() { - return lastWasNull; - } + public boolean isLastWasNull() { + return lastWasNull; + } - public void setLastWasNull(boolean lastWasNull) { - this.lastWasNull = lastWasNull; - } + public void setLastWasNull(boolean lastWasNull) { + this.lastWasNull = lastWasNull; + } - public TSDBResultSet() { + public TSDBResultSet() { } public TSDBResultSet(TSDBJNIConnector connecter, long resultSetPointer) throws SQLException { @@ -119,7 +119,7 @@ public class TSDBResultSet implements ResultSet { public boolean next() throws SQLException { if (rowData != null) { - this.rowData.clear(); + this.rowData.clear(); } int code = this.jniConnector.fetchRow(this.resultSetPointer, this.rowData); @@ -154,119 +154,119 @@ public class TSDBResultSet implements ResultSet { public String getString(int columnIndex) throws SQLException { String res = null; int colIndex = getTrueColumnIndex(columnIndex); - - this.lastWasNull = this.rowData.wasNull(colIndex); - if (!lastWasNull) { - res = this.rowData.getString(colIndex, this.columnMetaDataList.get(colIndex).getColType()); - } + + this.lastWasNull = this.rowData.wasNull(colIndex); + if (!lastWasNull) { + res = this.rowData.getString(colIndex, this.columnMetaDataList.get(colIndex).getColType()); + } return res; } public boolean getBoolean(int columnIndex) throws SQLException { - boolean res = false; + boolean res = false; int colIndex = getTrueColumnIndex(columnIndex); this.lastWasNull = this.rowData.wasNull(colIndex); if (!lastWasNull) { - res = this.rowData.getBoolean(colIndex, this.columnMetaDataList.get(colIndex).getColType()); - } + res = this.rowData.getBoolean(colIndex, this.columnMetaDataList.get(colIndex).getColType()); + } return res; } public byte getByte(int columnIndex) throws SQLException { - byte res = 0; + byte res = 0; int colIndex = getTrueColumnIndex(columnIndex); this.lastWasNull = this.rowData.wasNull(colIndex); if (!lastWasNull) { - res = (byte) this.rowData.getInt(colIndex, this.columnMetaDataList.get(colIndex).getColType()); - } + res = (byte) this.rowData.getInt(colIndex, this.columnMetaDataList.get(colIndex).getColType()); + } return res; } public short getShort(int columnIndex) throws SQLException { - short res = 0; + short res = 0; int colIndex = getTrueColumnIndex(columnIndex); this.lastWasNull = this.rowData.wasNull(colIndex); if (!lastWasNull) { - res = (short) this.rowData.getInt(colIndex, this.columnMetaDataList.get(colIndex).getColType()); - } + res = (short) this.rowData.getInt(colIndex, this.columnMetaDataList.get(colIndex).getColType()); + } return res; } public int getInt(int columnIndex) throws SQLException { - int res = 0; + int res = 0; int colIndex = getTrueColumnIndex(columnIndex); this.lastWasNull = this.rowData.wasNull(colIndex); - if (!lastWasNull) { - res = this.rowData.getInt(colIndex, this.columnMetaDataList.get(colIndex).getColType()); - } + if (!lastWasNull) { + res = this.rowData.getInt(colIndex, this.columnMetaDataList.get(colIndex).getColType()); + } return res; } public long getLong(int columnIndex) throws SQLException { - long res = 0l; + long res = 0l; int colIndex = getTrueColumnIndex(columnIndex); this.lastWasNull = this.rowData.wasNull(colIndex); - if (!lastWasNull) { - res = this.rowData.getLong(colIndex, this.columnMetaDataList.get(colIndex).getColType()); - } + if (!lastWasNull) { + res = this.rowData.getLong(colIndex, this.columnMetaDataList.get(colIndex).getColType()); + } return res; } public float getFloat(int columnIndex) throws SQLException { - float res = 0; - int colIndex = getTrueColumnIndex(columnIndex); + float res = 0; + int colIndex = getTrueColumnIndex(columnIndex); this.lastWasNull = this.rowData.wasNull(colIndex); - if (!lastWasNull) { - res = this.rowData.getFloat(colIndex, this.columnMetaDataList.get(colIndex).getColType()); - } + if (!lastWasNull) { + res = this.rowData.getFloat(colIndex, this.columnMetaDataList.get(colIndex).getColType()); + } return res; } public double getDouble(int columnIndex) throws SQLException { - double res = 0; + double res = 0; int colIndex = getTrueColumnIndex(columnIndex); this.lastWasNull = this.rowData.wasNull(colIndex); - if (!lastWasNull) { - res = this.rowData.getDouble(colIndex, this.columnMetaDataList.get(colIndex).getColType()); - } + if (!lastWasNull) { + res = this.rowData.getDouble(colIndex, this.columnMetaDataList.get(colIndex).getColType()); + } return res; } /* * (non-Javadoc) - * + * * @see java.sql.ResultSet#getBigDecimal(int, int) - * + * * @deprecated Use {@code getBigDecimal(int columnIndex)} or {@code * getBigDecimal(String columnLabel)} */ @Deprecated public BigDecimal getBigDecimal(int columnIndex, int scale) throws SQLException { - BigDecimal res = null; + BigDecimal res = null; int colIndex = getTrueColumnIndex(columnIndex); this.lastWasNull = this.rowData.wasNull(colIndex); - if (!lastWasNull) { - res = new BigDecimal(this.rowData.getLong(colIndex, this.columnMetaDataList.get(colIndex).getColType())); - } + if (!lastWasNull) { + res = new BigDecimal(this.rowData.getLong(colIndex, this.columnMetaDataList.get(colIndex).getColType())); + } return res; } public byte[] getBytes(int columnIndex) throws SQLException { - byte[] res = null; + byte[] res = null; int colIndex = getTrueColumnIndex(columnIndex); this.lastWasNull = this.rowData.wasNull(colIndex); - if (!lastWasNull) { - res = this.rowData.getString(colIndex, this.columnMetaDataList.get(colIndex).getColType()).getBytes(); - } + if (!lastWasNull) { + res = this.rowData.getString(colIndex, this.columnMetaDataList.get(colIndex).getColType()).getBytes(); + } return res; } @@ -281,13 +281,13 @@ public class TSDBResultSet implements ResultSet { } public Timestamp getTimestamp(int columnIndex) throws SQLException { - Timestamp res = null; + Timestamp res = null; int colIndex = getTrueColumnIndex(columnIndex); this.lastWasNull = this.rowData.wasNull(colIndex); - if (!lastWasNull) { - res = this.rowData.getTimestamp(colIndex); - } + if (!lastWasNull) { + res = this.rowData.getTimestamp(colIndex); + } return res; } @@ -297,9 +297,9 @@ public class TSDBResultSet implements ResultSet { /* * (non-Javadoc) - * + * * @see java.sql.ResultSet#getUnicodeStream(int) - * + * * * @deprecated use getCharacterStream in place of * getUnicodeStream */ @@ -409,13 +409,13 @@ public class TSDBResultSet implements ResultSet { } public int findColumn(String columnLabel) throws SQLException { - Iterator colMetaDataIt = this.columnMetaDataList.iterator(); - while (colMetaDataIt.hasNext()) { - ColumnMetaData colMetaData = colMetaDataIt.next(); - if (colMetaData.getColName() != null && colMetaData.getColName().equalsIgnoreCase(columnLabel)) { - return colMetaData.getColIndex() + 1; - } - } + Iterator colMetaDataIt = this.columnMetaDataList.iterator(); + while (colMetaDataIt.hasNext()) { + ColumnMetaData colMetaData = colMetaDataIt.next(); + if (colMetaData.getColName() != null && colMetaData.getColName().equalsIgnoreCase(columnLabel)) { + return colMetaData.getColIndex() + 1; + } + } throw new SQLException(TSDBConstants.INVALID_VARIABLES); } @@ -882,7 +882,7 @@ public class TSDBResultSet implements ResultSet { } public String getNString(int columnIndex) throws SQLException { - int colIndex = getTrueColumnIndex(columnIndex); + int colIndex = getTrueColumnIndex(columnIndex); return (String) rowData.get(colIndex); } @@ -1017,17 +1017,17 @@ public class TSDBResultSet implements ResultSet { public T getObject(String columnLabel, Class type) throws SQLException { throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG); } - + private int getTrueColumnIndex(int columnIndex) throws SQLException { if (columnIndex < this.COLUMN_INDEX_START_VALUE) { throw new SQLException("Column Index out of range, " + columnIndex + " < " + this.COLUMN_INDEX_START_VALUE); } - + int numOfCols = this.columnMetaDataList.size(); if (columnIndex > numOfCols) { throw new SQLException("Column Index out of range, " + columnIndex + " > " + numOfCols); } - + return columnIndex - 1; } } diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBSubscribe.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBSubscribe.java new file mode 100644 index 0000000000..3b479aafc3 --- /dev/null +++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBSubscribe.java @@ -0,0 +1,185 @@ +/*************************************************************************** + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + *****************************************************************************/ +package com.taosdata.jdbc; + +import javax.management.OperationsException; +import java.sql.SQLException; +import java.util.Map; +import java.util.TimerTask; +import java.util.concurrent.*; + +public class TSDBSubscribe { + private TSDBJNIConnector connecter = null; + private static ScheduledExecutorService pool; + private static Map timerTaskMap = new ConcurrentHashMap<>(); + private static Map scheduledMap = new ConcurrentHashMap(); + + private static class TimerInstance { + private static final ScheduledExecutorService instance = Executors.newScheduledThreadPool(1); + } + + public static ScheduledExecutorService getTimerInstance() { + return TimerInstance.instance; + } + + public TSDBSubscribe(TSDBJNIConnector connecter) throws SQLException { + if (null != connecter) { + this.connecter = connecter; + } else { + throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL)); + } + } + + /** + * sync subscribe + * + * @param topic + * @param sql + * @param restart + * @param period + * @throws SQLException + */ + public long subscribe(String topic, String sql, boolean restart, int period) throws SQLException { + if (this.connecter.isClosed()) { + throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL)); + } + if (period < 1000) { + throw new SQLException(TSDBConstants.WrapErrMsg(TSDBConstants.INVALID_VARIABLES)); + } + return this.connecter.subscribe(topic, sql, restart, period); + } + + /** + * async subscribe + * + * @param topic + * @param sql + * @param restart + * @param period + * @param callBack + * @throws SQLException + */ + public long subscribe(String topic, String sql, boolean restart, int period, TSDBSubscribeCallBack callBack) throws SQLException { + if (this.connecter.isClosed()) { + throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL)); + } + final long subscription = this.connecter.subscribe(topic, sql, restart, period); + if (null != callBack) { + pool = getTimerInstance(); + + TSDBTimerTask timerTask = new TSDBTimerTask(subscription, callBack); + + timerTaskMap.put(subscription, timerTask); + + ScheduledFuture scheduledFuture = pool.scheduleAtFixedRate(timerTask, 1, 1000, TimeUnit.MILLISECONDS); + scheduledMap.put(subscription, scheduledFuture); + } + return subscription; + } + + public TSDBResultSet consume(long subscription) throws OperationsException, SQLException { + if (this.connecter.isClosed()) { + throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL)); + } + if (0 == subscription) { + throw new OperationsException("Invalid use of consume"); + } + long resultSetPointer = this.connecter.consume(subscription); + + if (resultSetPointer == TSDBConstants.JNI_CONNECTION_NULL) { + throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL)); + } else if (resultSetPointer == TSDBConstants.JNI_NULL_POINTER) { + return null; + } else { + return new TSDBResultSet(this.connecter, resultSetPointer); + } + } + + /** + * cancel subscribe + * + * @param subscription + * @param isKeep + * @throws SQLException + */ + public void unsubscribe(long subscription, boolean isKeep) throws SQLException { + if (this.connecter.isClosed()) { + throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL)); + } + + if (null != timerTaskMap.get(subscription)) { + synchronized (timerTaskMap.get(subscription)) { + while (1 == timerTaskMap.get(subscription).getState()) { + try { + Thread.sleep(10); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + timerTaskMap.get(subscription).setState(2); + if (!timerTaskMap.isEmpty() && timerTaskMap.containsKey(subscription)) { + timerTaskMap.get(subscription).cancel(); + timerTaskMap.remove(subscription); + scheduledMap.get(subscription).cancel(false); + scheduledMap.remove(subscription); + } + this.connecter.unsubscribe(subscription, isKeep); + } + } else { + this.connecter.unsubscribe(subscription, isKeep); + } + } + + class TSDBTimerTask extends TimerTask { + private long subscription; + private TSDBSubscribeCallBack callBack; + // 0: not running 1: running 2: cancel + private int state = 0; + + public TSDBTimerTask(long subscription, TSDBSubscribeCallBack callBack) { + this.subscription = subscription; + this.callBack = callBack; + } + + public int getState() { + return this.state; + } + + public void setState(int state) { + this.state = state; + } + + @Override + public void run() { + synchronized (this) { + if (2 == state) { + return; + } + + state = 1; + + try { + TSDBResultSet resultSet = consume(subscription); + callBack.invoke(resultSet); + } catch (Exception e) { + this.cancel(); + throw new RuntimeException(e); + } + state = 0; + } + } + } +} + diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBSubscribeCallBack.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBSubscribeCallBack.java new file mode 100644 index 0000000000..c1b9b02fa8 --- /dev/null +++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBSubscribeCallBack.java @@ -0,0 +1,19 @@ +/*************************************************************************** + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + *****************************************************************************/ +package com.taosdata.jdbc; + +public interface TSDBSubscribeCallBack { + void invoke(TSDBResultSet resultSet); +} diff --git a/src/connector/jdbc/src/test/java/TestAsyncTSDBSubscribe.java b/src/connector/jdbc/src/test/java/TestAsyncTSDBSubscribe.java new file mode 100644 index 0000000000..5b2b6367ec --- /dev/null +++ b/src/connector/jdbc/src/test/java/TestAsyncTSDBSubscribe.java @@ -0,0 +1,83 @@ +import com.taosdata.jdbc.*; +import org.apache.commons.lang3.StringUtils; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.util.Properties; + +public class TestAsyncTSDBSubscribe { + public static void main(String[] args) { + String usage = "java -cp taos-jdbcdriver-1.0.3_dev-dist.jar com.taosdata.jdbc.TSDBSubscribe -db dbName -topic topicName " + + "-tname tableName -h host"; + if (args.length < 2) { + System.err.println(usage); + return; + } + + String dbName = ""; + String tName = ""; + String host = "localhost"; + String topic = ""; + for (int i = 0; i < args.length; i++) { + if ("-db".equalsIgnoreCase(args[i]) && i < args.length - 1) { + dbName = args[++i]; + } + if ("-tname".equalsIgnoreCase(args[i]) && i < args.length - 1) { + tName = args[++i]; + } + if ("-h".equalsIgnoreCase(args[i]) && i < args.length - 1) { + host = args[++i]; + } + if ("-topic".equalsIgnoreCase(args[i]) && i < args.length - 1) { + topic = args[++i]; + } + } + if (StringUtils.isEmpty(dbName) || StringUtils.isEmpty(tName) || StringUtils.isEmpty(topic)) { + System.err.println(usage); + return; + } + + Connection connection = null; + TSDBSubscribe subscribe = null; + long subscribId = 0; + try { + Class.forName("com.taosdata.jdbc.TSDBDriver"); + Properties properties = new Properties(); + properties.setProperty(TSDBDriver.PROPERTY_KEY_HOST, host); + connection = DriverManager.getConnection("jdbc:TAOS://" + host + ":0/" + dbName + "?user=root&password=taosdata", properties); + String rawSql = "select * from " + tName + ";"; + subscribe = ((TSDBConnection) connection).createSubscribe(); + subscribId = subscribe.subscribe(topic, rawSql, false, 1000, new CallBack("first")); + long subscribId2 = subscribe.subscribe("test", rawSql, false, 1000, new CallBack("second")); + int a = 0; + Thread.sleep(2000); + subscribe.unsubscribe(subscribId, true); + System.err.println("cancel subscribe"); + } catch (Exception e) { + e.printStackTrace(); + } + } + + private static class CallBack implements TSDBSubscribeCallBack { + private String name = ""; + + public CallBack(String name) { + this.name = name; + } + + @Override + public void invoke(TSDBResultSet resultSet) { + try { + while (null !=resultSet && resultSet.next()) { + System.out.print("callback_" + name + ": "); + for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) { + System.out.printf(i + ": " + resultSet.getString(i) + "\t"); + } + System.out.println(); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + } +} diff --git a/src/connector/jdbc/src/test/java/TestPreparedStatement.java b/src/connector/jdbc/src/test/java/TestPreparedStatement.java index 2e2cc0ede6..3b84645b5b 100644 --- a/src/connector/jdbc/src/test/java/TestPreparedStatement.java +++ b/src/connector/jdbc/src/test/java/TestPreparedStatement.java @@ -10,9 +10,9 @@ public class TestPreparedStatement { try { Class.forName("com.taosdata.jdbc.TSDBDriver"); Properties properties = new Properties(); - properties.setProperty(TSDBDriver.PROPERTY_KEY_HOST, "192.168.1.117"); - Connection connection = DriverManager.getConnection("jdbc:TAOS://192.168.1.117:0/?user=root&password=taosdata", properties); - String rawSql = "SELECT ts, c1 FROM (select c1, ts from db.tb1) SUB_QRY"; + properties.setProperty(TSDBDriver.PROPERTY_KEY_HOST, "localhost"); + Connection connection = DriverManager.getConnection("jdbc:TAOS://localhost:0/?user=root&password=taosdata", properties); + String rawSql = "select * from test.log0601"; // String[] params = new String[]{"ts", "c1"}; PreparedStatement pstmt = (TSDBPreparedStatement) connection.prepareStatement(rawSql); ResultSet resSet = pstmt.executeQuery(); diff --git a/src/connector/jdbc/src/test/java/TestTSDBSubscribe.java b/src/connector/jdbc/src/test/java/TestTSDBSubscribe.java new file mode 100644 index 0000000000..f12924c8a6 --- /dev/null +++ b/src/connector/jdbc/src/test/java/TestTSDBSubscribe.java @@ -0,0 +1,83 @@ +import com.taosdata.jdbc.TSDBConnection; +import com.taosdata.jdbc.TSDBDriver; +import com.taosdata.jdbc.TSDBResultSet; +import com.taosdata.jdbc.TSDBSubscribe; +import org.apache.commons.lang3.StringUtils; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.util.Properties; + +public class TestTSDBSubscribe { + public static void main(String[] args) throws Exception { + String usage = "java -cp taos-jdbcdriver-1.0.3_dev-dist.jar com.taosdata.jdbc.TSDBSubscribe -db dbName " + + "-topic topicName -tname tableName -h host"; + if (args.length < 2) { + System.err.println(usage); + return; + } + + String dbName = ""; + String tName = ""; + String host = "localhost"; + String topic = ""; + for (int i = 0; i < args.length; i++) { + if ("-db".equalsIgnoreCase(args[i]) && i < args.length - 1) { + dbName = args[++i]; + } + if ("-tname".equalsIgnoreCase(args[i]) && i < args.length - 1) { + tName = args[++i]; + } + if ("-h".equalsIgnoreCase(args[i]) && i < args.length - 1) { + host = args[++i]; + } + if ("-topic".equalsIgnoreCase(args[i]) && i < args.length - 1) { + topic = args[++i]; + } + } + if (StringUtils.isEmpty(dbName) || StringUtils.isEmpty(tName) || StringUtils.isEmpty(topic)) { + System.err.println(usage); + return; + } + + Connection connection = null; + TSDBSubscribe subscribe = null; + long subscribId = 0; + try { + Class.forName("com.taosdata.jdbc.TSDBDriver"); + Properties properties = new Properties(); + properties.setProperty(TSDBDriver.PROPERTY_KEY_HOST, host); + connection = DriverManager.getConnection("jdbc:TAOS://" + host + ":0/" + dbName + "?user=root&password=taosdata" + , properties); + String rawSql = "select * from " + tName + ";"; + subscribe = ((TSDBConnection) connection).createSubscribe(); + subscribId = subscribe.subscribe(topic, rawSql, false, 1000); + int a = 0; + while (true) { + Thread.sleep(900); + TSDBResultSet resSet = subscribe.consume(subscribId); + + while (resSet.next()) { + for (int i = 1; i <= resSet.getMetaData().getColumnCount(); i++) { + System.out.printf(i + ": " + resSet.getString(i) + "\t"); + } + System.out.println("\n======" + a + "=========="); + } + + a++; + if (a >= 10) { + break; + } + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + if (null != subscribe && 0 != subscribId) { + subscribe.unsubscribe(subscribId, true); + } + if (null != connection) { + connection.close(); + } + } + } +} diff --git a/src/kit/taosnetwork/client.c b/src/kit/taosnetwork/client.c new file mode 100644 index 0000000000..545ff79d80 --- /dev/null +++ b/src/kit/taosnetwork/client.c @@ -0,0 +1,128 @@ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define BUFFER_SIZE 200 + +typedef struct { + int port; + char *host[15]; +} info; + +void *checkPort(void *sarg) { + info *pinfo = (info *)sarg; + int port = pinfo->port; + char *host = *pinfo->host; + int clientSocket; + + struct sockaddr_in serverAddr; + char sendbuf[BUFFER_SIZE]; + char recvbuf[BUFFER_SIZE]; + int iDataNum; + if ((clientSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) { + perror("socket"); + return NULL; + } + serverAddr.sin_family = AF_INET; + serverAddr.sin_port = htons(port); + + serverAddr.sin_addr.s_addr = inet_addr(host); + + printf("=================================\n"); + if (connect(clientSocket, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) < 0) { + perror("connect"); + return NULL; + } + printf("Connect to: %s:%d...success\n", host, port); + + sprintf(sendbuf, "send port_%d", port); + send(clientSocket, sendbuf, strlen(sendbuf), 0); + printf("Send msg_%d: %s\n", port, sendbuf); + + recvbuf[0] = '\0'; + iDataNum = recv(clientSocket, recvbuf, BUFFER_SIZE, 0); + recvbuf[iDataNum] = '\0'; + printf("Read ack msg_%d: %s\n", port, recvbuf); + + printf("=================================\n"); + close(clientSocket); + return NULL; +} + +void *checkUPort(void *sarg) { + info *pinfo = (info *)sarg; + int port = pinfo->port; + char *host = *pinfo->host; + int clientSocket; + + struct sockaddr_in serverAddr; + char sendbuf[BUFFER_SIZE]; + char recvbuf[BUFFER_SIZE]; + int iDataNum; + if ((clientSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) { + perror("socket"); + return NULL; + } + serverAddr.sin_family = AF_INET; + serverAddr.sin_port = htons(port); + + serverAddr.sin_addr.s_addr = inet_addr(host); + + printf("=================================\n"); + // if (connect(clientSocket, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) < 0) { + // perror("connect"); + // return NULL; + // } + + // printf("Connect to: %s:%d...success\n", host, port); + + sprintf(sendbuf, "send msg port_%d by udp", port); + + socklen_t sin_size = sizeof(*(struct sockaddr*)&serverAddr); + + sendto(clientSocket, sendbuf, strlen(sendbuf), 0, (struct sockaddr *)&serverAddr, (int)sin_size); + + printf("Send msg_%d by udp: %s\n", port, sendbuf); + + recvbuf[0] = '\0'; + iDataNum = recvfrom(clientSocket, recvbuf, BUFFER_SIZE, 0, (struct sockaddr *)&serverAddr, &sin_size); + recvbuf[iDataNum] = '\0'; + printf("Read ack msg_%d from udp: %s\n", port, recvbuf); + + printf("=================================\n"); + close(clientSocket); + return NULL; +} + +int main() { + int port = 6020; + char *host = "127.0.0.1"; + info *infos = malloc(10 * sizeof(info)); + info *uinfos = malloc(10 * sizeof(info)); + + for (size_t i = 0; i < 10; i++) { + port++; + printf("For test: %s:%d\n", host, port); + + info *pinfo = infos++; + *pinfo->host = host; + pinfo->port = port; + checkPort(pinfo); + + info *uinfo = uinfos++; + *uinfo->host = host; + uinfo->port = port; + checkUPort(uinfo); + } +} \ No newline at end of file diff --git a/src/kit/taosnetwork/server.c b/src/kit/taosnetwork/server.c new file mode 100644 index 0000000000..2533c53ca1 --- /dev/null +++ b/src/kit/taosnetwork/server.c @@ -0,0 +1,190 @@ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define BUFFER_SIZE 200 + +typedef struct { + int port; + int type; // 0: tcp, 1: udo, default: 0 +} info; + +static void *bindPort(void *sarg) { + info *pinfo = (info *)sarg; + int port = pinfo->port; + int type = pinfo->type; + int serverSocket; + + struct sockaddr_in server_addr; + struct sockaddr_in clientAddr; + int addr_len = sizeof(clientAddr); + int client; + char buffer[BUFFER_SIZE]; + int iDataNum; + + if ((serverSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { + perror("socket"); + return NULL; + } + + bzero(&server_addr, sizeof(server_addr)); + server_addr.sin_family = AF_INET; + server_addr.sin_port = htons(port); + server_addr.sin_addr.s_addr = htonl(INADDR_ANY); + + if (bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) { + perror("connect"); + return NULL; + } + + if (listen(serverSocket, 5) < 0) { + perror("listen"); + return NULL; + } + + printf("Bind port: %d success\n", port); + while (1) { + client = accept(serverSocket, (struct sockaddr *)&clientAddr, (socklen_t *)&addr_len); + if (client < 0) { + perror("accept"); + continue; + } + printf("=================================\n"); + + printf("Client ip is %s, Server port is %d\n", inet_ntoa(clientAddr.sin_addr), port); + while (1) { + buffer[0] = '\0'; + iDataNum = recv(client, buffer, BUFFER_SIZE, 0); + + if (iDataNum < 0) { + perror("recv null"); + continue; + } + if (iDataNum > 0) { + buffer[iDataNum] = '\0'; + printf("read msg:%s\n", buffer); + if (strcmp(buffer, "quit") == 0) break; + buffer[0] = '\0'; + + sprintf(buffer, "ack port_%d", port); + printf("send ack msg:%s\n", buffer); + + send(client, buffer, strlen(buffer), 0); + break; + } + } + printf("=================================\n"); + } + close(serverSocket); + return NULL; +} + +static void *bindUPort(void *sarg) { + info *pinfo = (info *)sarg; + int port = pinfo->port; + int type = pinfo->type; + int serverSocket; + + struct sockaddr_in server_addr; + struct sockaddr_in clientAddr; + int addr_len = sizeof(clientAddr); + int client; + char buffer[BUFFER_SIZE]; + int iDataNum; + + if ((serverSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) { + perror("socket"); + return NULL; + } + + bzero(&server_addr, sizeof(server_addr)); + server_addr.sin_family = AF_INET; + server_addr.sin_port = htons(port); + server_addr.sin_addr.s_addr = htonl(INADDR_ANY); + + if (bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) { + perror("connect"); + return NULL; + } + + socklen_t sin_size; + printf("Bind port: %d success\n", port); + + while (1) { + buffer[0] = '\0'; + + sin_size = sizeof(*(struct sockaddr *)&server_addr); + + iDataNum = recvfrom(serverSocket, buffer, BUFFER_SIZE, 0, (struct sockaddr *)&clientAddr, &sin_size); + + if (iDataNum < 0) { + perror("recvfrom null"); + continue; + } + if (iDataNum > 0) { + printf("=================================\n"); + + printf("Client ip is %s, Server port is %d\n", inet_ntoa(clientAddr.sin_addr), port); + buffer[iDataNum] = '\0'; + printf("Read msg from udp:%s\n", buffer); + if (strcmp(buffer, "quit") == 0) break; + buffer[0] = '\0'; + + sprintf(buffer, "ack port_%d by udp", port); + printf("Send ack msg by udp:%s\n", buffer); + + sendto(serverSocket, buffer, strlen(buffer), 0, (struct sockaddr *)&clientAddr, (int)sin_size); + + send(client, buffer, strlen(buffer), 0); + printf("=================================\n"); + } + } + + close(serverSocket); + return NULL; +} + + +int main() { + int port = 6020; + pthread_t *pids = malloc(20 * sizeof(pthread_t)); + info * infos = malloc(10 * sizeof(info)); + info * uinfos = malloc(10 * sizeof(info)); + + for (size_t i = 0; i < 10; i++) { + port++; + + info *pinfo = infos++; + pinfo->port = port; + + if (pthread_create(pids + i, NULL, bindPort, pinfo) != 0) //创建线程 + { //创建线程失败 + printf("创建线程失败: %d.\n", port); + exit(0); + } + + info *uinfo = uinfos++; + uinfo->port = port; + uinfo->type = 1; + if (pthread_create(pids + 10 + i, NULL, bindUPort, uinfo) != 0) //创建线程 + { //创建线程失败 + printf("创建线程失败: %d.\n", port); + exit(0); + } + } + for (int i = 0; i < 10; i++) { + pthread_join(pids[i], NULL); + pthread_join(pids[(10 + i)], NULL); + } +} -- GitLab