diff --git a/src/client/jni/com_taosdata_jdbc_TSDBJNIConnector.h b/src/client/jni/com_taosdata_jdbc_TSDBJNIConnector.h
index 3782d1693dc7b7cabd7990062748ee088103dad0..d81bbd9b6db2cb77a55304430f0488afff2bf261 100644
--- a/src/client/jni/com_taosdata_jdbc_TSDBJNIConnector.h
+++ b/src/client/jni/com_taosdata_jdbc_TSDBJNIConnector.h
@@ -142,8 +142,8 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_subscribeImp
* Method: consumeImp
* Signature: (J)Lcom/taosdata/jdbc/TSDBResultSetRowData;
*/
-JNIEXPORT jobject JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp
- (JNIEnv *, jobject, jlong, jint);
+JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp
+ (JNIEnv *, jobject, jlong);
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
diff --git a/src/client/src/TSDBJNIConnector.c b/src/client/src/TSDBJNIConnector.c
index 2a5f7f441ed846d509f2825f452e8477db63504e..cec4737226fb6b2bb5966b63a1a5e72f0a98f71b 100644
--- a/src/client/src/TSDBJNIConnector.c
+++ b/src/client/src/TSDBJNIConnector.c
@@ -551,8 +551,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeConnectionIm
}
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_subscribeImp(JNIEnv *env, jobject jobj, jlong con,
- jboolean restart, jstring jtopic,
- jstring jsql, jint jinterval) {
+ jboolean restart, jstring jtopic, jstring jsql, jint jinterval) {
jlong sub = 0;
TAOS *taos = (TAOS *)con;
char *topic = NULL;
@@ -583,106 +582,20 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_subscribeImp(JNI
return sub;
}
-static jobject convert_one_row(JNIEnv *env, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) {
- jobject rowobj = (*env)->NewObject(env, g_rowdataClass, g_rowdataConstructor, num_fields);
- jniTrace("created a rowdata object, rowobj:%p", rowobj);
-
- for (int i = 0; i < num_fields; i++) {
- if (row[i] == NULL) {
- continue;
- }
-
- switch (fields[i].type) {
- case TSDB_DATA_TYPE_BOOL:
- (*env)->CallVoidMethod(env, rowobj, g_rowdataSetBooleanFp, i, (jboolean)(*((char *)row[i]) == 1));
- break;
- case TSDB_DATA_TYPE_TINYINT:
- (*env)->CallVoidMethod(env, rowobj, g_rowdataSetByteFp, i, (jbyte) * ((char *)row[i]));
- break;
- case TSDB_DATA_TYPE_SMALLINT:
- (*env)->CallVoidMethod(env, rowobj, g_rowdataSetShortFp, i, (jshort) * ((short *)row[i]));
- break;
- case TSDB_DATA_TYPE_INT:
- (*env)->CallVoidMethod(env, rowobj, g_rowdataSetIntFp, i, (jint) * (int *)row[i]);
- break;
- case TSDB_DATA_TYPE_BIGINT:
- (*env)->CallVoidMethod(env, rowobj, g_rowdataSetLongFp, i, (jlong) * ((int64_t *)row[i]));
- break;
- case TSDB_DATA_TYPE_FLOAT: {
- float fv = 0;
- fv = GET_FLOAT_VAL(row[i]);
- (*env)->CallVoidMethod(env, rowobj, g_rowdataSetFloatFp, i, (jfloat)fv);
- } break;
- case TSDB_DATA_TYPE_DOUBLE: {
- double dv = 0;
- dv = GET_DOUBLE_VAL(row[i]);
- (*env)->CallVoidMethod(env, rowobj, g_rowdataSetDoubleFp, i, (jdouble)dv);
- } break;
- case TSDB_DATA_TYPE_BINARY: {
- char tmp[TSDB_MAX_BYTES_PER_ROW] = {0};
- strncpy(tmp, row[i], (size_t)fields[i].bytes); // handle the case that terminated does not exist
- (*env)->CallVoidMethod(env, rowobj, g_rowdataSetStringFp, i, (*env)->NewStringUTF(env, tmp));
-
- memset(tmp, 0, (size_t)fields[i].bytes);
- break;
- }
- case TSDB_DATA_TYPE_NCHAR:
- (*env)->CallVoidMethod(env, rowobj, g_rowdataSetByteArrayFp, i,
- jniFromNCharToByteArray(env, (char *)row[i], fields[i].bytes));
- break;
- case TSDB_DATA_TYPE_TIMESTAMP:
- (*env)->CallVoidMethod(env, rowobj, g_rowdataSetTimestampFp, i, (jlong) * ((int64_t *)row[i]));
- break;
- default:
- break;
- }
- }
- return rowobj;
-}
-
-JNIEXPORT jobject JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp(JNIEnv *env, jobject jobj, jlong sub,
- jint timeout) {
+JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp(JNIEnv *env, jobject jobj, jlong sub) {
jniTrace("jobj:%p, in TSDBJNIConnector_consumeImp, sub:%ld", jobj, sub);
jniGetGlobalMethod(env);
TAOS_SUB *tsub = (TAOS_SUB *)sub;
- jobject rows = (*env)->NewObject(env, g_arrayListClass, g_arrayListConstructFp);
-
- int64_t start = taosGetTimestampMs();
- int count = 0;
- while (true) {
- TAOS_RES *res = taos_consume(tsub);
- if (res == NULL) {
- jniError("jobj:%p, tsub:%p, taos_consume returns NULL", jobj, tsub);
- return NULL;
- }
+ TAOS_RES *res = taos_consume(tsub);
- TAOS_FIELD *fields = taos_fetch_fields(res);
- int num_fields = taos_num_fields(res);
- while (true) {
- TAOS_ROW row = taos_fetch_row(res);
- if (row == NULL) {
- break;
- }
- jobject rowobj = convert_one_row(env, row, fields, num_fields);
- (*env)->CallBooleanMethod(env, rows, g_arrayListAddFp, rowobj);
- count++;
- }
-
- if (count > 0) {
- break;
- }
- if (timeout == -1) {
- continue;
- }
- if (((int)(taosGetTimestampMs() - start)) >= timeout) {
- jniTrace("jobj:%p, sub:%ld, timeout", jobj, sub);
- break;
- }
+ if (res == NULL) {
+ jniError("jobj:%p, tsub:%p, taos_consume returns NULL", jobj, tsub);
+ return 0l;
}
- return rows;
+ return (long)res;
}
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_unsubscribeImp(JNIEnv *env, jobject jobj, jlong sub,
diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/DatabaseMetaDataResultSet.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/DatabaseMetaDataResultSet.java
index cbd3523b34bed60c593024e3a0e090e1cd7dc32f..86938031f6e4a6001bca2c8f2911e6a8f0284cc4 100644
--- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/DatabaseMetaDataResultSet.java
+++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/DatabaseMetaDataResultSet.java
@@ -160,6 +160,7 @@ public class DatabaseMetaDataResultSet implements ResultSet {
@Override
public Timestamp getTimestamp(int columnIndex) throws SQLException {
+ columnIndex--;
return rowCursor.getTimestamp(columnIndex);
}
diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBConnection.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBConnection.java
index 4640f6b446b6d285806408856489c5e950b37a81..062cb63cfd508800d2135494a8d45b6682a16fc1 100644
--- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBConnection.java
+++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBConnection.java
@@ -84,6 +84,14 @@ public class TSDBConnection implements Connection {
}
}
+ public TSDBSubscribe createSubscribe() throws SQLException {
+ if (!this.connector.isClosed()) {
+ return new TSDBSubscribe(this.connector);
+ } else {
+ throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
+ }
+ }
+
public PreparedStatement prepareStatement(String sql) throws SQLException {
if (!this.connector.isClosed()) {
return new TSDBPreparedStatement(this.connector, sql);
diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java
index 6fa9e7bc4822e898462c03099b6043bb48116281..8bb7084604851c695961564ed2f8c0142accb7fb 100755
--- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java
+++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBJNIConnector.java
@@ -266,31 +266,31 @@ public class TSDBJNIConnector {
/**
* Subscribe to a table in TSDB
*/
- public long subscribe(String host, String user, String password, String database, String table, long time, int period) {
- return subscribeImp(host, user, password, database, table, time, period);
+ public long subscribe(String topic, String sql, boolean restart, int period) {
+ return subscribeImp(this.taos, restart, topic, sql, period);
}
- private native long subscribeImp(String host, String user, String password, String database, String table, long time, int period);
+ public native long subscribeImp(long connection, boolean restart, String topic, String sql, int period);
/**
* Consume a subscribed table
*/
- public TSDBResultSetRowData consume(long subscription) {
+ public long consume(long subscription) {
return this.consumeImp(subscription);
}
- private native TSDBResultSetRowData consumeImp(long subscription);
+ private native long consumeImp(long subscription);
/**
* Unsubscribe a table
*
* @param subscription
*/
- public void unsubscribe(long subscription) {
- unsubscribeImp(subscription);
+ public void unsubscribe(long subscription, boolean isKeep) {
+ unsubscribeImp(subscription, isKeep);
}
- private native void unsubscribeImp(long subscription);
+ private native void unsubscribeImp(long subscription, boolean isKeep);
/**
* Validate if a create table sql statement is correct without actually creating that table
diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBResultSet.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBResultSet.java
index 8acf77975630e2f652b79d6c492f98194a943931..961633b8aec4ec433a56522e504fb35495ccfa78 100644
--- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBResultSet.java
+++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBResultSet.java
@@ -51,47 +51,47 @@ public class TSDBResultSet implements ResultSet {
private boolean lastWasNull = false;
private final int COLUMN_INDEX_START_VALUE = 1;
- public TSDBJNIConnector getJniConnector() {
- return jniConnector;
- }
+ public TSDBJNIConnector getJniConnector() {
+ return jniConnector;
+ }
- public void setJniConnector(TSDBJNIConnector jniConnector) {
- this.jniConnector = jniConnector;
- }
+ public void setJniConnector(TSDBJNIConnector jniConnector) {
+ this.jniConnector = jniConnector;
+ }
- public long getResultSetPointer() {
- return resultSetPointer;
- }
+ public long getResultSetPointer() {
+ return resultSetPointer;
+ }
- public void setResultSetPointer(long resultSetPointer) {
- this.resultSetPointer = resultSetPointer;
- }
+ public void setResultSetPointer(long resultSetPointer) {
+ this.resultSetPointer = resultSetPointer;
+ }
- public List getColumnMetaDataList() {
- return columnMetaDataList;
- }
+ public List getColumnMetaDataList() {
+ return columnMetaDataList;
+ }
- public void setColumnMetaDataList(List columnMetaDataList) {
- this.columnMetaDataList = columnMetaDataList;
- }
+ public void setColumnMetaDataList(List columnMetaDataList) {
+ this.columnMetaDataList = columnMetaDataList;
+ }
- public TSDBResultSetRowData getRowData() {
- return rowData;
- }
+ public TSDBResultSetRowData getRowData() {
+ return rowData;
+ }
- public void setRowData(TSDBResultSetRowData rowData) {
- this.rowData = rowData;
- }
+ public void setRowData(TSDBResultSetRowData rowData) {
+ this.rowData = rowData;
+ }
- public boolean isLastWasNull() {
- return lastWasNull;
- }
+ public boolean isLastWasNull() {
+ return lastWasNull;
+ }
- public void setLastWasNull(boolean lastWasNull) {
- this.lastWasNull = lastWasNull;
- }
+ public void setLastWasNull(boolean lastWasNull) {
+ this.lastWasNull = lastWasNull;
+ }
- public TSDBResultSet() {
+ public TSDBResultSet() {
}
public TSDBResultSet(TSDBJNIConnector connecter, long resultSetPointer) throws SQLException {
@@ -119,7 +119,7 @@ public class TSDBResultSet implements ResultSet {
public boolean next() throws SQLException {
if (rowData != null) {
- this.rowData.clear();
+ this.rowData.clear();
}
int code = this.jniConnector.fetchRow(this.resultSetPointer, this.rowData);
@@ -154,119 +154,119 @@ public class TSDBResultSet implements ResultSet {
public String getString(int columnIndex) throws SQLException {
String res = null;
int colIndex = getTrueColumnIndex(columnIndex);
-
- this.lastWasNull = this.rowData.wasNull(colIndex);
- if (!lastWasNull) {
- res = this.rowData.getString(colIndex, this.columnMetaDataList.get(colIndex).getColType());
- }
+
+ this.lastWasNull = this.rowData.wasNull(colIndex);
+ if (!lastWasNull) {
+ res = this.rowData.getString(colIndex, this.columnMetaDataList.get(colIndex).getColType());
+ }
return res;
}
public boolean getBoolean(int columnIndex) throws SQLException {
- boolean res = false;
+ boolean res = false;
int colIndex = getTrueColumnIndex(columnIndex);
this.lastWasNull = this.rowData.wasNull(colIndex);
if (!lastWasNull) {
- res = this.rowData.getBoolean(colIndex, this.columnMetaDataList.get(colIndex).getColType());
- }
+ res = this.rowData.getBoolean(colIndex, this.columnMetaDataList.get(colIndex).getColType());
+ }
return res;
}
public byte getByte(int columnIndex) throws SQLException {
- byte res = 0;
+ byte res = 0;
int colIndex = getTrueColumnIndex(columnIndex);
this.lastWasNull = this.rowData.wasNull(colIndex);
if (!lastWasNull) {
- res = (byte) this.rowData.getInt(colIndex, this.columnMetaDataList.get(colIndex).getColType());
- }
+ res = (byte) this.rowData.getInt(colIndex, this.columnMetaDataList.get(colIndex).getColType());
+ }
return res;
}
public short getShort(int columnIndex) throws SQLException {
- short res = 0;
+ short res = 0;
int colIndex = getTrueColumnIndex(columnIndex);
this.lastWasNull = this.rowData.wasNull(colIndex);
if (!lastWasNull) {
- res = (short) this.rowData.getInt(colIndex, this.columnMetaDataList.get(colIndex).getColType());
- }
+ res = (short) this.rowData.getInt(colIndex, this.columnMetaDataList.get(colIndex).getColType());
+ }
return res;
}
public int getInt(int columnIndex) throws SQLException {
- int res = 0;
+ int res = 0;
int colIndex = getTrueColumnIndex(columnIndex);
this.lastWasNull = this.rowData.wasNull(colIndex);
- if (!lastWasNull) {
- res = this.rowData.getInt(colIndex, this.columnMetaDataList.get(colIndex).getColType());
- }
+ if (!lastWasNull) {
+ res = this.rowData.getInt(colIndex, this.columnMetaDataList.get(colIndex).getColType());
+ }
return res;
}
public long getLong(int columnIndex) throws SQLException {
- long res = 0l;
+ long res = 0l;
int colIndex = getTrueColumnIndex(columnIndex);
this.lastWasNull = this.rowData.wasNull(colIndex);
- if (!lastWasNull) {
- res = this.rowData.getLong(colIndex, this.columnMetaDataList.get(colIndex).getColType());
- }
+ if (!lastWasNull) {
+ res = this.rowData.getLong(colIndex, this.columnMetaDataList.get(colIndex).getColType());
+ }
return res;
}
public float getFloat(int columnIndex) throws SQLException {
- float res = 0;
- int colIndex = getTrueColumnIndex(columnIndex);
+ float res = 0;
+ int colIndex = getTrueColumnIndex(columnIndex);
this.lastWasNull = this.rowData.wasNull(colIndex);
- if (!lastWasNull) {
- res = this.rowData.getFloat(colIndex, this.columnMetaDataList.get(colIndex).getColType());
- }
+ if (!lastWasNull) {
+ res = this.rowData.getFloat(colIndex, this.columnMetaDataList.get(colIndex).getColType());
+ }
return res;
}
public double getDouble(int columnIndex) throws SQLException {
- double res = 0;
+ double res = 0;
int colIndex = getTrueColumnIndex(columnIndex);
this.lastWasNull = this.rowData.wasNull(colIndex);
- if (!lastWasNull) {
- res = this.rowData.getDouble(colIndex, this.columnMetaDataList.get(colIndex).getColType());
- }
+ if (!lastWasNull) {
+ res = this.rowData.getDouble(colIndex, this.columnMetaDataList.get(colIndex).getColType());
+ }
return res;
}
/*
* (non-Javadoc)
- *
+ *
* @see java.sql.ResultSet#getBigDecimal(int, int)
- *
+ *
* @deprecated Use {@code getBigDecimal(int columnIndex)} or {@code
* getBigDecimal(String columnLabel)}
*/
@Deprecated
public BigDecimal getBigDecimal(int columnIndex, int scale) throws SQLException {
- BigDecimal res = null;
+ BigDecimal res = null;
int colIndex = getTrueColumnIndex(columnIndex);
this.lastWasNull = this.rowData.wasNull(colIndex);
- if (!lastWasNull) {
- res = new BigDecimal(this.rowData.getLong(colIndex, this.columnMetaDataList.get(colIndex).getColType()));
- }
+ if (!lastWasNull) {
+ res = new BigDecimal(this.rowData.getLong(colIndex, this.columnMetaDataList.get(colIndex).getColType()));
+ }
return res;
}
public byte[] getBytes(int columnIndex) throws SQLException {
- byte[] res = null;
+ byte[] res = null;
int colIndex = getTrueColumnIndex(columnIndex);
this.lastWasNull = this.rowData.wasNull(colIndex);
- if (!lastWasNull) {
- res = this.rowData.getString(colIndex, this.columnMetaDataList.get(colIndex).getColType()).getBytes();
- }
+ if (!lastWasNull) {
+ res = this.rowData.getString(colIndex, this.columnMetaDataList.get(colIndex).getColType()).getBytes();
+ }
return res;
}
@@ -281,13 +281,13 @@ public class TSDBResultSet implements ResultSet {
}
public Timestamp getTimestamp(int columnIndex) throws SQLException {
- Timestamp res = null;
+ Timestamp res = null;
int colIndex = getTrueColumnIndex(columnIndex);
this.lastWasNull = this.rowData.wasNull(colIndex);
- if (!lastWasNull) {
- res = this.rowData.getTimestamp(colIndex);
- }
+ if (!lastWasNull) {
+ res = this.rowData.getTimestamp(colIndex);
+ }
return res;
}
@@ -297,9 +297,9 @@ public class TSDBResultSet implements ResultSet {
/*
* (non-Javadoc)
- *
+ *
* @see java.sql.ResultSet#getUnicodeStream(int)
- *
+ *
* * @deprecated use getCharacterStream
in place of
* getUnicodeStream
*/
@@ -409,13 +409,13 @@ public class TSDBResultSet implements ResultSet {
}
public int findColumn(String columnLabel) throws SQLException {
- Iterator colMetaDataIt = this.columnMetaDataList.iterator();
- while (colMetaDataIt.hasNext()) {
- ColumnMetaData colMetaData = colMetaDataIt.next();
- if (colMetaData.getColName() != null && colMetaData.getColName().equalsIgnoreCase(columnLabel)) {
- return colMetaData.getColIndex() + 1;
- }
- }
+ Iterator colMetaDataIt = this.columnMetaDataList.iterator();
+ while (colMetaDataIt.hasNext()) {
+ ColumnMetaData colMetaData = colMetaDataIt.next();
+ if (colMetaData.getColName() != null && colMetaData.getColName().equalsIgnoreCase(columnLabel)) {
+ return colMetaData.getColIndex() + 1;
+ }
+ }
throw new SQLException(TSDBConstants.INVALID_VARIABLES);
}
@@ -882,7 +882,7 @@ public class TSDBResultSet implements ResultSet {
}
public String getNString(int columnIndex) throws SQLException {
- int colIndex = getTrueColumnIndex(columnIndex);
+ int colIndex = getTrueColumnIndex(columnIndex);
return (String) rowData.get(colIndex);
}
@@ -1017,17 +1017,17 @@ public class TSDBResultSet implements ResultSet {
public T getObject(String columnLabel, Class type) throws SQLException {
throw new SQLException(TSDBConstants.UNSUPPORT_METHOD_EXCEPTIONZ_MSG);
}
-
+
private int getTrueColumnIndex(int columnIndex) throws SQLException {
if (columnIndex < this.COLUMN_INDEX_START_VALUE) {
throw new SQLException("Column Index out of range, " + columnIndex + " < " + this.COLUMN_INDEX_START_VALUE);
}
-
+
int numOfCols = this.columnMetaDataList.size();
if (columnIndex > numOfCols) {
throw new SQLException("Column Index out of range, " + columnIndex + " > " + numOfCols);
}
-
+
return columnIndex - 1;
}
}
diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBSubscribe.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBSubscribe.java
new file mode 100644
index 0000000000000000000000000000000000000000..3b479aafc35a50e85e42d9cda1e93c2d68f8e115
--- /dev/null
+++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBSubscribe.java
@@ -0,0 +1,185 @@
+/***************************************************************************
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *****************************************************************************/
+package com.taosdata.jdbc;
+
+import javax.management.OperationsException;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.TimerTask;
+import java.util.concurrent.*;
+
+public class TSDBSubscribe {
+ private TSDBJNIConnector connecter = null;
+ private static ScheduledExecutorService pool;
+ private static Map timerTaskMap = new ConcurrentHashMap<>();
+ private static Map scheduledMap = new ConcurrentHashMap();
+
+ private static class TimerInstance {
+ private static final ScheduledExecutorService instance = Executors.newScheduledThreadPool(1);
+ }
+
+ public static ScheduledExecutorService getTimerInstance() {
+ return TimerInstance.instance;
+ }
+
+ public TSDBSubscribe(TSDBJNIConnector connecter) throws SQLException {
+ if (null != connecter) {
+ this.connecter = connecter;
+ } else {
+ throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
+ }
+ }
+
+ /**
+ * sync subscribe
+ *
+ * @param topic
+ * @param sql
+ * @param restart
+ * @param period
+ * @throws SQLException
+ */
+ public long subscribe(String topic, String sql, boolean restart, int period) throws SQLException {
+ if (this.connecter.isClosed()) {
+ throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
+ }
+ if (period < 1000) {
+ throw new SQLException(TSDBConstants.WrapErrMsg(TSDBConstants.INVALID_VARIABLES));
+ }
+ return this.connecter.subscribe(topic, sql, restart, period);
+ }
+
+ /**
+ * async subscribe
+ *
+ * @param topic
+ * @param sql
+ * @param restart
+ * @param period
+ * @param callBack
+ * @throws SQLException
+ */
+ public long subscribe(String topic, String sql, boolean restart, int period, TSDBSubscribeCallBack callBack) throws SQLException {
+ if (this.connecter.isClosed()) {
+ throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
+ }
+ final long subscription = this.connecter.subscribe(topic, sql, restart, period);
+ if (null != callBack) {
+ pool = getTimerInstance();
+
+ TSDBTimerTask timerTask = new TSDBTimerTask(subscription, callBack);
+
+ timerTaskMap.put(subscription, timerTask);
+
+ ScheduledFuture scheduledFuture = pool.scheduleAtFixedRate(timerTask, 1, 1000, TimeUnit.MILLISECONDS);
+ scheduledMap.put(subscription, scheduledFuture);
+ }
+ return subscription;
+ }
+
+ public TSDBResultSet consume(long subscription) throws OperationsException, SQLException {
+ if (this.connecter.isClosed()) {
+ throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
+ }
+ if (0 == subscription) {
+ throw new OperationsException("Invalid use of consume");
+ }
+ long resultSetPointer = this.connecter.consume(subscription);
+
+ if (resultSetPointer == TSDBConstants.JNI_CONNECTION_NULL) {
+ throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
+ } else if (resultSetPointer == TSDBConstants.JNI_NULL_POINTER) {
+ return null;
+ } else {
+ return new TSDBResultSet(this.connecter, resultSetPointer);
+ }
+ }
+
+ /**
+ * cancel subscribe
+ *
+ * @param subscription
+ * @param isKeep
+ * @throws SQLException
+ */
+ public void unsubscribe(long subscription, boolean isKeep) throws SQLException {
+ if (this.connecter.isClosed()) {
+ throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
+ }
+
+ if (null != timerTaskMap.get(subscription)) {
+ synchronized (timerTaskMap.get(subscription)) {
+ while (1 == timerTaskMap.get(subscription).getState()) {
+ try {
+ Thread.sleep(10);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ timerTaskMap.get(subscription).setState(2);
+ if (!timerTaskMap.isEmpty() && timerTaskMap.containsKey(subscription)) {
+ timerTaskMap.get(subscription).cancel();
+ timerTaskMap.remove(subscription);
+ scheduledMap.get(subscription).cancel(false);
+ scheduledMap.remove(subscription);
+ }
+ this.connecter.unsubscribe(subscription, isKeep);
+ }
+ } else {
+ this.connecter.unsubscribe(subscription, isKeep);
+ }
+ }
+
+ class TSDBTimerTask extends TimerTask {
+ private long subscription;
+ private TSDBSubscribeCallBack callBack;
+ // 0: not running 1: running 2: cancel
+ private int state = 0;
+
+ public TSDBTimerTask(long subscription, TSDBSubscribeCallBack callBack) {
+ this.subscription = subscription;
+ this.callBack = callBack;
+ }
+
+ public int getState() {
+ return this.state;
+ }
+
+ public void setState(int state) {
+ this.state = state;
+ }
+
+ @Override
+ public void run() {
+ synchronized (this) {
+ if (2 == state) {
+ return;
+ }
+
+ state = 1;
+
+ try {
+ TSDBResultSet resultSet = consume(subscription);
+ callBack.invoke(resultSet);
+ } catch (Exception e) {
+ this.cancel();
+ throw new RuntimeException(e);
+ }
+ state = 0;
+ }
+ }
+ }
+}
+
diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBSubscribeCallBack.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBSubscribeCallBack.java
new file mode 100644
index 0000000000000000000000000000000000000000..c1b9b02fa85f8e2197f41d75f8f0c25b3f4c416c
--- /dev/null
+++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBSubscribeCallBack.java
@@ -0,0 +1,19 @@
+/***************************************************************************
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *****************************************************************************/
+package com.taosdata.jdbc;
+
+public interface TSDBSubscribeCallBack {
+ void invoke(TSDBResultSet resultSet);
+}
diff --git a/src/connector/jdbc/src/test/java/TestAsyncTSDBSubscribe.java b/src/connector/jdbc/src/test/java/TestAsyncTSDBSubscribe.java
new file mode 100644
index 0000000000000000000000000000000000000000..5b2b6367ec5fe7cb46f0514b59931d8942f9bc74
--- /dev/null
+++ b/src/connector/jdbc/src/test/java/TestAsyncTSDBSubscribe.java
@@ -0,0 +1,83 @@
+import com.taosdata.jdbc.*;
+import org.apache.commons.lang3.StringUtils;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.Properties;
+
+public class TestAsyncTSDBSubscribe {
+ public static void main(String[] args) {
+ String usage = "java -cp taos-jdbcdriver-1.0.3_dev-dist.jar com.taosdata.jdbc.TSDBSubscribe -db dbName -topic topicName " +
+ "-tname tableName -h host";
+ if (args.length < 2) {
+ System.err.println(usage);
+ return;
+ }
+
+ String dbName = "";
+ String tName = "";
+ String host = "localhost";
+ String topic = "";
+ for (int i = 0; i < args.length; i++) {
+ if ("-db".equalsIgnoreCase(args[i]) && i < args.length - 1) {
+ dbName = args[++i];
+ }
+ if ("-tname".equalsIgnoreCase(args[i]) && i < args.length - 1) {
+ tName = args[++i];
+ }
+ if ("-h".equalsIgnoreCase(args[i]) && i < args.length - 1) {
+ host = args[++i];
+ }
+ if ("-topic".equalsIgnoreCase(args[i]) && i < args.length - 1) {
+ topic = args[++i];
+ }
+ }
+ if (StringUtils.isEmpty(dbName) || StringUtils.isEmpty(tName) || StringUtils.isEmpty(topic)) {
+ System.err.println(usage);
+ return;
+ }
+
+ Connection connection = null;
+ TSDBSubscribe subscribe = null;
+ long subscribId = 0;
+ try {
+ Class.forName("com.taosdata.jdbc.TSDBDriver");
+ Properties properties = new Properties();
+ properties.setProperty(TSDBDriver.PROPERTY_KEY_HOST, host);
+ connection = DriverManager.getConnection("jdbc:TAOS://" + host + ":0/" + dbName + "?user=root&password=taosdata", properties);
+ String rawSql = "select * from " + tName + ";";
+ subscribe = ((TSDBConnection) connection).createSubscribe();
+ subscribId = subscribe.subscribe(topic, rawSql, false, 1000, new CallBack("first"));
+ long subscribId2 = subscribe.subscribe("test", rawSql, false, 1000, new CallBack("second"));
+ int a = 0;
+ Thread.sleep(2000);
+ subscribe.unsubscribe(subscribId, true);
+ System.err.println("cancel subscribe");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private static class CallBack implements TSDBSubscribeCallBack {
+ private String name = "";
+
+ public CallBack(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public void invoke(TSDBResultSet resultSet) {
+ try {
+ while (null !=resultSet && resultSet.next()) {
+ System.out.print("callback_" + name + ": ");
+ for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) {
+ System.out.printf(i + ": " + resultSet.getString(i) + "\t");
+ }
+ System.out.println();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+}
diff --git a/src/connector/jdbc/src/test/java/TestPreparedStatement.java b/src/connector/jdbc/src/test/java/TestPreparedStatement.java
index 2e2cc0ede6ae88c9b43c4890894378f13a3cc14a..3b84645b5b0f250c453615d0afbd1f018bbe523f 100644
--- a/src/connector/jdbc/src/test/java/TestPreparedStatement.java
+++ b/src/connector/jdbc/src/test/java/TestPreparedStatement.java
@@ -10,9 +10,9 @@ public class TestPreparedStatement {
try {
Class.forName("com.taosdata.jdbc.TSDBDriver");
Properties properties = new Properties();
- properties.setProperty(TSDBDriver.PROPERTY_KEY_HOST, "192.168.1.117");
- Connection connection = DriverManager.getConnection("jdbc:TAOS://192.168.1.117:0/?user=root&password=taosdata", properties);
- String rawSql = "SELECT ts, c1 FROM (select c1, ts from db.tb1) SUB_QRY";
+ properties.setProperty(TSDBDriver.PROPERTY_KEY_HOST, "localhost");
+ Connection connection = DriverManager.getConnection("jdbc:TAOS://localhost:0/?user=root&password=taosdata", properties);
+ String rawSql = "select * from test.log0601";
// String[] params = new String[]{"ts", "c1"};
PreparedStatement pstmt = (TSDBPreparedStatement) connection.prepareStatement(rawSql);
ResultSet resSet = pstmt.executeQuery();
diff --git a/src/connector/jdbc/src/test/java/TestTSDBSubscribe.java b/src/connector/jdbc/src/test/java/TestTSDBSubscribe.java
new file mode 100644
index 0000000000000000000000000000000000000000..f12924c8a61a3dbeeec56f7d2c712472d771e8e7
--- /dev/null
+++ b/src/connector/jdbc/src/test/java/TestTSDBSubscribe.java
@@ -0,0 +1,83 @@
+import com.taosdata.jdbc.TSDBConnection;
+import com.taosdata.jdbc.TSDBDriver;
+import com.taosdata.jdbc.TSDBResultSet;
+import com.taosdata.jdbc.TSDBSubscribe;
+import org.apache.commons.lang3.StringUtils;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.Properties;
+
+public class TestTSDBSubscribe {
+ public static void main(String[] args) throws Exception {
+ String usage = "java -cp taos-jdbcdriver-1.0.3_dev-dist.jar com.taosdata.jdbc.TSDBSubscribe -db dbName " +
+ "-topic topicName -tname tableName -h host";
+ if (args.length < 2) {
+ System.err.println(usage);
+ return;
+ }
+
+ String dbName = "";
+ String tName = "";
+ String host = "localhost";
+ String topic = "";
+ for (int i = 0; i < args.length; i++) {
+ if ("-db".equalsIgnoreCase(args[i]) && i < args.length - 1) {
+ dbName = args[++i];
+ }
+ if ("-tname".equalsIgnoreCase(args[i]) && i < args.length - 1) {
+ tName = args[++i];
+ }
+ if ("-h".equalsIgnoreCase(args[i]) && i < args.length - 1) {
+ host = args[++i];
+ }
+ if ("-topic".equalsIgnoreCase(args[i]) && i < args.length - 1) {
+ topic = args[++i];
+ }
+ }
+ if (StringUtils.isEmpty(dbName) || StringUtils.isEmpty(tName) || StringUtils.isEmpty(topic)) {
+ System.err.println(usage);
+ return;
+ }
+
+ Connection connection = null;
+ TSDBSubscribe subscribe = null;
+ long subscribId = 0;
+ try {
+ Class.forName("com.taosdata.jdbc.TSDBDriver");
+ Properties properties = new Properties();
+ properties.setProperty(TSDBDriver.PROPERTY_KEY_HOST, host);
+ connection = DriverManager.getConnection("jdbc:TAOS://" + host + ":0/" + dbName + "?user=root&password=taosdata"
+ , properties);
+ String rawSql = "select * from " + tName + ";";
+ subscribe = ((TSDBConnection) connection).createSubscribe();
+ subscribId = subscribe.subscribe(topic, rawSql, false, 1000);
+ int a = 0;
+ while (true) {
+ Thread.sleep(900);
+ TSDBResultSet resSet = subscribe.consume(subscribId);
+
+ while (resSet.next()) {
+ for (int i = 1; i <= resSet.getMetaData().getColumnCount(); i++) {
+ System.out.printf(i + ": " + resSet.getString(i) + "\t");
+ }
+ System.out.println("\n======" + a + "==========");
+ }
+
+ a++;
+ if (a >= 10) {
+ break;
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ if (null != subscribe && 0 != subscribId) {
+ subscribe.unsubscribe(subscribId, true);
+ }
+ if (null != connection) {
+ connection.close();
+ }
+ }
+ }
+}
diff --git a/src/dnode/src/dnodeMain.c b/src/dnode/src/dnodeMain.c
index 69400816295a155de26f6f0175180931096471aa..5e81d77489fa8a49e1593c663c88d2eaa053cfb0 100644
--- a/src/dnode/src/dnodeMain.c
+++ b/src/dnode/src/dnodeMain.c
@@ -38,6 +38,7 @@ static void dnodeCheckDataDirOpenned(char *dir);
static SDnodeRunStatus tsDnodeRunStatus = TSDB_DNODE_RUN_STATUS_STOPPED;
static int32_t dnodeInitComponents();
static void dnodeCleanupComponents(int32_t stepId);
+static int dnodeCreateDir(const char *dir);
typedef struct {
const char *const name;
@@ -59,6 +60,16 @@ static const SDnodeComponent SDnodeComponents[] = {
{"shell", dnodeInitShell, dnodeCleanupShell}
};
+static int dnodeCreateDir(const char *dir) {
+ struct stat dirstat;
+ if (stat(dir, &dirstat) < 0) {
+ if (mkdir(dir, 0755) != 0 && errno != EEXIST) {
+ return -1;
+ }
+ }
+ return 0;
+}
+
static void dnodeCleanupComponents(int32_t stepId) {
for (int32_t i = stepId; i >= 0; i--) {
SDnodeComponents[i].cleanup();
@@ -87,9 +98,9 @@ int32_t dnodeInitSystem() {
taosSetCoreDump();
signal(SIGPIPE, SIG_IGN);
- struct stat dirstat;
- if (stat(tsLogDir, &dirstat) < 0) {
- mkdir(tsLogDir, 0755);
+ if (dnodeCreateDir(tsLogDir) < 0) {
+ printf("failed to create dir: %s, reason: %s\n", tsLogDir, strerror(errno));
+ return -1;
}
char temp[TSDB_FILENAME_LEN];
@@ -140,7 +151,11 @@ static void dnodeCheckDataDirOpenned(char *dir) {
char filepath[256] = {0};
sprintf(filepath, "%s/.running", dir);
- int32_t fd = open(filepath, O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO);
+ int fd = open(filepath, O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO);
+ if (fd < 0) {
+ dError("failed to open lock file:%s, reason: %s, quit", filepath, strerror(errno));
+ exit(0);
+ }
int32_t ret = flock(fd, LOCK_EX | LOCK_NB);
if (ret != 0) {
dError("failed to lock file:%s ret:%d, database may be running, quit", filepath, ret);
@@ -150,16 +165,28 @@ static void dnodeCheckDataDirOpenned(char *dir) {
}
static int32_t dnodeInitStorage() {
- struct stat dirstat;
- if (stat(tsDataDir, &dirstat) < 0) {
- mkdir(tsDataDir, 0755);
+ if (dnodeCreateDir(tsDataDir) < 0) {
+ dError("failed to create dir: %s, reason: %s", tsDataDir, strerror(errno));
+ return -1;
}
-
sprintf(tsMnodeDir, "%s/mnode", tsDataDir);
sprintf(tsVnodeDir, "%s/vnode", tsDataDir);
sprintf(tsDnodeDir, "%s/dnode", tsDataDir);
- mkdir(tsVnodeDir, 0755);
- mkdir(tsDnodeDir, 0755);
+
+ //TODO(dengyihao): no need to init here
+ if (dnodeCreateDir(tsMnodeDir) < 0) {
+ dError("failed to create dir: %s, reason: %s", tsMnodeDir, strerror(errno));
+ return -1;
+ }
+ //TODO(dengyihao): no need to init here
+ if (dnodeCreateDir(tsVnodeDir) < 0) {
+ dError("failed to create dir: %s, reason: %s", tsVnodeDir, strerror(errno));
+ return -1;
+ }
+ if (dnodeCreateDir(tsDnodeDir) < 0) {
+ dError("failed to create dir: %s, reason: %s", tsDnodeDir, strerror(errno));
+ return -1;
+ }
dnodeCheckDataDirOpenned(tsDnodeDir);
diff --git a/src/kit/taosnetwork/client.c b/src/kit/taosnetwork/client.c
new file mode 100644
index 0000000000000000000000000000000000000000..65b866a99b302a83472eff54367682dcd42b749c
--- /dev/null
+++ b/src/kit/taosnetwork/client.c
@@ -0,0 +1,136 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+#define BUFFER_SIZE 200
+
+typedef struct {
+ int port;
+ char *host[15];
+} info;
+
+void *checkPort(void *sarg) {
+ info *pinfo = (info *)sarg;
+ int port = pinfo->port;
+ char *host = *pinfo->host;
+ int clientSocket;
+
+ struct sockaddr_in serverAddr;
+ char sendbuf[BUFFER_SIZE];
+ char recvbuf[BUFFER_SIZE];
+ int iDataNum;
+ if ((clientSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+ perror("socket");
+ return NULL;
+ }
+ serverAddr.sin_family = AF_INET;
+ serverAddr.sin_port = htons(port);
+
+ serverAddr.sin_addr.s_addr = inet_addr(host);
+
+ printf("=================================\n");
+ if (connect(clientSocket, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) < 0) {
+ perror("connect");
+ return NULL;
+ }
+ printf("Connect to: %s:%d...success\n", host, port);
+
+ sprintf(sendbuf, "send port_%d", port);
+ send(clientSocket, sendbuf, strlen(sendbuf), 0);
+ printf("Send msg_%d: %s\n", port, sendbuf);
+
+ recvbuf[0] = '\0';
+ iDataNum = recv(clientSocket, recvbuf, BUFFER_SIZE, 0);
+ recvbuf[iDataNum] = '\0';
+ printf("Read ack msg_%d: %s\n", port, recvbuf);
+
+ printf("=================================\n");
+ close(clientSocket);
+ return NULL;
+}
+
+void *checkUPort(void *sarg) {
+ info *pinfo = (info *)sarg;
+ int port = pinfo->port;
+ char *host = *pinfo->host;
+ int clientSocket;
+
+ struct sockaddr_in serverAddr;
+ char sendbuf[BUFFER_SIZE];
+ char recvbuf[BUFFER_SIZE];
+ int iDataNum;
+ if ((clientSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
+ perror("socket");
+ return NULL;
+ }
+ serverAddr.sin_family = AF_INET;
+ serverAddr.sin_port = htons(port);
+
+ serverAddr.sin_addr.s_addr = inet_addr(host);
+
+ printf("=================================\n");
+
+ sprintf(sendbuf, "send msg port_%d by udp", port);
+
+ socklen_t sin_size = sizeof(*(struct sockaddr*)&serverAddr);
+
+ sendto(clientSocket, sendbuf, strlen(sendbuf), 0, (struct sockaddr *)&serverAddr, (int)sin_size);
+
+ printf("Send msg_%d by udp: %s\n", port, sendbuf);
+
+ recvbuf[0] = '\0';
+ iDataNum = recvfrom(clientSocket, recvbuf, BUFFER_SIZE, 0, (struct sockaddr *)&serverAddr, &sin_size);
+ recvbuf[iDataNum] = '\0';
+ printf("Read ack msg_%d from udp: %s\n", port, recvbuf);
+
+ printf("=================================\n");
+ close(clientSocket);
+ return NULL;
+}
+
+int main() {
+ int port = 6020;
+ char *host = "127.0.0.1";
+ info *tinfo = malloc(sizeof(info));
+ info *uinfo = malloc(sizeof(info));
+
+ for (size_t i = 0; i < 30; i++) {
+ port++;
+ printf("For test: %s:%d\n", host, port);
+
+ *tinfo->host = host;
+ tinfo->port = port;
+ checkPort(tinfo);
+
+ *uinfo->host = host;
+ uinfo->port = port;
+ checkUPort(uinfo);
+ }
+ free(tinfo);
+ free(uinfo);
+}
\ No newline at end of file
diff --git a/src/kit/taosnetwork/server.c b/src/kit/taosnetwork/server.c
new file mode 100644
index 0000000000000000000000000000000000000000..7dcc9cbeda70d3c51fd678138f1273071d7a7e7f
--- /dev/null
+++ b/src/kit/taosnetwork/server.c
@@ -0,0 +1,204 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+#define BUFFER_SIZE 200
+
+typedef struct {
+ int port;
+ int type; // 0: tcp, 1: udo, default: 0
+} info;
+
+static void *bindPort(void *sarg) {
+ info *pinfo = (info *)sarg;
+ int port = pinfo->port;
+ int type = pinfo->type;
+ int serverSocket;
+
+ struct sockaddr_in server_addr;
+ struct sockaddr_in clientAddr;
+ int addr_len = sizeof(clientAddr);
+ int client;
+ char buffer[BUFFER_SIZE];
+ int iDataNum;
+
+ if ((serverSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
+ perror("socket");
+ return NULL;
+ }
+
+ bzero(&server_addr, sizeof(server_addr));
+ server_addr.sin_family = AF_INET;
+ server_addr.sin_port = htons(port);
+ server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
+
+ if (bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
+ perror("connect");
+ return NULL;
+ }
+
+ if (listen(serverSocket, 5) < 0) {
+ perror("listen");
+ return NULL;
+ }
+
+ printf("Bind port: %d success\n", port);
+ while (1) {
+ client = accept(serverSocket, (struct sockaddr *)&clientAddr, (socklen_t *)&addr_len);
+ if (client < 0) {
+ perror("accept");
+ continue;
+ }
+ printf("=================================\n");
+
+ printf("Client ip is %s, Server port is %d\n", inet_ntoa(clientAddr.sin_addr), port);
+ while (1) {
+ buffer[0] = '\0';
+ iDataNum = recv(client, buffer, BUFFER_SIZE, 0);
+
+ if (iDataNum < 0) {
+ perror("recv null");
+ continue;
+ }
+ if (iDataNum > 0) {
+ buffer[iDataNum] = '\0';
+ printf("read msg:%s\n", buffer);
+ if (strcmp(buffer, "quit") == 0) break;
+ buffer[0] = '\0';
+
+ sprintf(buffer, "ack port_%d", port);
+ printf("send ack msg:%s\n", buffer);
+
+ send(client, buffer, strlen(buffer), 0);
+ break;
+ }
+ }
+ printf("=================================\n");
+ }
+ close(serverSocket);
+ return NULL;
+}
+
+static void *bindUPort(void *sarg) {
+ info *pinfo = (info *)sarg;
+ int port = pinfo->port;
+ int type = pinfo->type;
+ int serverSocket;
+
+ struct sockaddr_in server_addr;
+ struct sockaddr_in clientAddr;
+ int addr_len = sizeof(clientAddr);
+ int client;
+ char buffer[BUFFER_SIZE];
+ int iDataNum;
+
+ if ((serverSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
+ perror("socket");
+ return NULL;
+ }
+
+ bzero(&server_addr, sizeof(server_addr));
+ server_addr.sin_family = AF_INET;
+ server_addr.sin_port = htons(port);
+ server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
+
+ if (bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
+ perror("connect");
+ return NULL;
+ }
+
+ socklen_t sin_size;
+ printf("Bind port: %d success\n", port);
+
+ while (1) {
+ buffer[0] = '\0';
+
+ sin_size = sizeof(*(struct sockaddr *)&server_addr);
+
+ iDataNum = recvfrom(serverSocket, buffer, BUFFER_SIZE, 0, (struct sockaddr *)&clientAddr, &sin_size);
+
+ if (iDataNum < 0) {
+ perror("recvfrom null");
+ continue;
+ }
+ if (iDataNum > 0) {
+ printf("=================================\n");
+
+ printf("Client ip is %s, Server port is %d\n", inet_ntoa(clientAddr.sin_addr), port);
+ buffer[iDataNum] = '\0';
+ printf("Read msg from udp:%s\n", buffer);
+ if (strcmp(buffer, "quit") == 0) break;
+ buffer[0] = '\0';
+
+ sprintf(buffer, "ack port_%d by udp", port);
+ printf("Send ack msg by udp:%s\n", buffer);
+
+ sendto(serverSocket, buffer, strlen(buffer), 0, (struct sockaddr *)&clientAddr, (int)sin_size);
+
+ send(client, buffer, strlen(buffer), 0);
+ printf("=================================\n");
+ }
+ }
+
+ close(serverSocket);
+ return NULL;
+}
+
+
+int main() {
+ int port = 6020;
+ pthread_t *pids = malloc(60 * sizeof(pthread_t));
+ info * infos = malloc(30 * sizeof(info));
+ info * uinfos = malloc(30 * sizeof(info));
+
+ for (size_t i = 0; i < 30; i++) {
+ port++;
+
+ info *pinfo = infos++;
+ pinfo->port = port;
+
+ if (pthread_create(pids + i, NULL, bindPort, pinfo) != 0) //创建线程
+ { //创建线程失败
+ printf("创建线程失败: %d.\n", port);
+ exit(0);
+ }
+
+ info *uinfo = uinfos++;
+ uinfo->port = port;
+ uinfo->type = 1;
+ if (pthread_create(pids + 30 + i, NULL, bindUPort, uinfo) != 0) //创建线程
+ { //创建线程失败
+ printf("创建线程失败: %d.\n", port);
+ exit(0);
+ }
+ }
+ for (int i = 0; i < 30; i++) {
+ pthread_join(pids[i], NULL);
+ pthread_join(pids[(10 + i)], NULL);
+ }
+}
diff --git a/src/kit/taosnetwork/taosnetwork_client.c b/src/kit/taosnetwork/taosnetwork_client.c
new file mode 100644
index 0000000000000000000000000000000000000000..072610a0357211e2412b6a07f9621cac32e80c50
--- /dev/null
+++ b/src/kit/taosnetwork/taosnetwork_client.c
@@ -0,0 +1,66 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#include
+#include
+#include
+#include
+#include
+#include
+#define SERVER_PORT 8000
+#define SIZE 200
+
+int main() {
+ struct sockaddr_in servaddr, cliaddr;
+ socklen_t cliaddr_len;
+ int client_sockfd;
+ char buf[SIZE];
+ char recvbuf[SIZE];
+
+ int i, n, flag = 0;
+
+ int len, iDataNum;
+
+ client_sockfd = socket(AF_INET, SOCK_STREAM, 0);
+ bzero(&servaddr, sizeof(servaddr));
+ servaddr.sin_family = AF_INET;
+ servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
+ servaddr.sin_port = htons(SERVER_PORT);
+
+ if (connect(client_sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
+ printf("Connected error..\n");
+ return 0;
+ }
+ printf("Connected to server..\n");
+
+ /*循环的发送接收信息并打印接收信息(可以按需发送)--recv返回接收到的字节数,send返回发送的字节数*/
+ while (1) {
+ printf("Enter string to send:");
+ scanf("%s", buf);
+ if (!strcmp(buf, "quit")) {
+ break;
+ }
+ len = (sizeof buf);
+
+ recvbuf[0] = '\0';
+
+ iDataNum = recv(client_sockfd, recvbuf, SIZE, 0);
+
+ recvbuf[iDataNum] = '\0';
+
+ printf("%s\n", recvbuf);
+ }
+ return 0;
+}
diff --git a/src/kit/taosnetwork/taosnetwork_server.c b/src/kit/taosnetwork/taosnetwork_server.c
new file mode 100644
index 0000000000000000000000000000000000000000..1ec20716fa49bd01eeca7116bb5bd33c76f5886c
--- /dev/null
+++ b/src/kit/taosnetwork/taosnetwork_server.c
@@ -0,0 +1,57 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#include
+#include
+#include
+#include
+#include
+#include
+#define SERVER_PORT 8000
+#define SIZE 200
+
+int main() {
+ struct sockaddr_in servaddr, cliaddr;
+ socklen_t cliaddr_len;
+ int listenfd, connfd;
+ char buf[BUFSIZ];
+ int i, n, flag = 0;
+
+ listenfd = socket(AF_INET, SOCK_STREAM, 0);
+ bzero(&servaddr, sizeof(servaddr));
+ servaddr.sin_family = AF_INET;
+ servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
+ servaddr.sin_port = htons(SERVER_PORT);
+ bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr));
+ listen(listenfd, 20);
+
+ printf("Accepting connections..\n");
+ while (1) {
+ cliaddr_len = sizeof(cliaddr);
+ connfd = accept(listenfd, (struct sockaddr *)&cliaddr,
+ &cliaddr_len); //如果得不到客户端发来的消息,将会被阻塞,一直等到消息到来
+ n = read(connfd, buf, SIZE); //如果n<=0,表示客户端已断开
+ while (1) {
+ if (n != 0) {
+ for (i = 0; i < n; i++) printf("%c", buf[i]); //输出客户端发来的信息
+ } else {
+ printf("Client say close the connection..\n");
+ break;
+ }
+ n = read(connfd, buf, SIZE);
+ }
+ close(connfd);
+ }
+}
diff --git a/src/kit/taosnetwork/test_client.c b/src/kit/taosnetwork/test_client.c
new file mode 100644
index 0000000000000000000000000000000000000000..0c863c6a999cc7c3e8e7e12ec088fd3c2a1f2b2c
--- /dev/null
+++ b/src/kit/taosnetwork/test_client.c
@@ -0,0 +1,50 @@
+#include
+#include
+#include
+#include
+#include
+
+#define SERVER_PORT 8888
+#define BUFF_LEN 512
+#define SERVER_IP "172.0.5.182"
+
+void udp_msg_sender(int fd, struct sockaddr* dst) {}
+
+/*
+ client:
+ socket-->sendto-->revcfrom-->close
+*/
+
+int main(int argc, char* argv[]) {
+ int client_fd;
+ struct sockaddr_in ser_addr;
+
+ client_fd = socket(AF_INET, SOCK_DGRAM, 0);
+ if (client_fd < 0) {
+ printf("create socket fail!\n");
+ return -1;
+ }
+
+ memset(&ser_addr, 0, sizeof(ser_addr));
+ ser_addr.sin_family = AF_INET;
+ // ser_addr.sin_addr.s_addr = inet_addr(SERVER_IP);
+ ser_addr.sin_addr.s_addr = htonl(INADDR_ANY); //注意网络序转换
+ ser_addr.sin_port = htons(SERVER_PORT); //注意网络序转换
+
+ socklen_t len;
+ struct sockaddr_in src;
+ while (1) {
+ char buf[BUFF_LEN] = "TEST UDP MSG!\n";
+ len = sizeof(*(struct sockaddr*)&ser_addr);
+ printf("client:%s\n", buf); //打印自己发送的信息
+ sendto(client_fd, buf, BUFF_LEN, 0, (struct sockaddr*)&ser_addr, len);
+ memset(buf, 0, BUFF_LEN);
+ recvfrom(client_fd, buf, BUFF_LEN, 0, (struct sockaddr*)&src, &len); //接收来自server的信息
+ printf("server:%s\n", buf);
+ sleep(1); //一秒发送一次消息
+ }
+
+ close(client_fd);
+
+ return 0;
+}
\ No newline at end of file
diff --git a/src/kit/taosnetwork/test_server.c b/src/kit/taosnetwork/test_server.c
new file mode 100644
index 0000000000000000000000000000000000000000..3bfbaa4f7c746854e2d243712a0f663402bddaaf
--- /dev/null
+++ b/src/kit/taosnetwork/test_server.c
@@ -0,0 +1,63 @@
+#include
+#include
+#include
+#include
+#include
+
+#define SERVER_PORT 8888
+#define BUFF_LEN 1024
+
+void handle_udp_msg(int fd) {
+ char buf[BUFF_LEN]; //接收缓冲区,1024字节
+ socklen_t len;
+ int count;
+ struct sockaddr_in clent_addr; // clent_addr用于记录发送方的地址信息
+ while (1) {
+ memset(buf, 0, BUFF_LEN);
+ len = sizeof(clent_addr);
+ count =
+ recvfrom(fd, buf, BUFF_LEN, 0, (struct sockaddr*)&clent_addr, &len); // recvfrom是拥塞函数,没有数据就一直拥塞
+ if (count == -1) {
+ printf("recieve data fail!\n");
+ return;
+ }
+ printf("client:%s\n", buf); //打印client发过来的信息
+ memset(buf, 0, BUFF_LEN);
+ sprintf(buf, "I have recieved %d bytes data!\n", count); //回复client
+ printf("server:%s\n", buf); //打印自己发送的信息给
+ sendto(fd, buf, BUFF_LEN, 0, (struct sockaddr*)&clent_addr,
+ len); //发送信息给client,注意使用了clent_addr结构体指针
+ }
+}
+
+/*
+ server:
+ socket-->bind-->recvfrom-->sendto-->close
+*/
+
+int main(int argc, char* argv[]) {
+ int server_fd, ret;
+ struct sockaddr_in ser_addr;
+
+ server_fd = socket(AF_INET, SOCK_DGRAM, 0); // AF_INET:IPV4;SOCK_DGRAM:UDP
+ if (server_fd < 0) {
+ printf("create socket fail!\n");
+ return -1;
+ }
+
+ memset(&ser_addr, 0, sizeof(ser_addr));
+ ser_addr.sin_family = AF_INET;
+ ser_addr.sin_addr.s_addr = htonl(INADDR_ANY); // IP地址,需要进行网络序转换,INADDR_ANY:本地地址
+ ser_addr.sin_port = htons(SERVER_PORT); //端口号,需要网络序转换
+
+ ret = bind(server_fd, (struct sockaddr*)&ser_addr, sizeof(ser_addr));
+ if (ret < 0) {
+ printf("socket bind fail!\n");
+ return -1;
+ }
+
+ handle_udp_msg(server_fd); //处理接收到的数据
+
+ close(server_fd);
+ return 0;
+}
\ No newline at end of file
diff --git a/tests/pytest/fulltest.sh b/tests/pytest/fulltest.sh
index bf3523dc0f2a46f2895dbf90f92d0da505940615..99dc658d71b7524c1880deef5eb551f833c39a7d 100755
--- a/tests/pytest/fulltest.sh
+++ b/tests/pytest/fulltest.sh
@@ -131,6 +131,6 @@ python3 ./test.py -f user/pass_len.py
#query
python3 ./test.py -f query/filter.py
-python3 ./test.py $1 -f query/filterCombo.py
-python3 ./test.py $1 -f query/queryNormal.py
-python3 ./test.py $1 -f query/queryError.py
+python3 ./test.py -f query/filterCombo.py
+python3 ./test.py -f query/queryNormal.py
+python3 ./test.py -f query/queryError.py
diff --git a/tests/test-all.sh b/tests/test-all.sh
index 7bde4da67a9fa4eb5ac9dcea8071acff8228d39b..cd5444858ebf87246bca0849fe2cfeb522032d2b 100755
--- a/tests/test-all.sh
+++ b/tests/test-all.sh
@@ -1,5 +1,22 @@
#!/bin/bash
+function runSimCaseOneByOne {
+ while read -r line; do
+ if [[ $line =~ ^run.* ]]; then
+ case=`echo $line | awk '{print $2}'`
+ ./test.sh -f $case 2>&1 | grep 'success\|failed\|fault' | grep -v 'default' | tee -a out.log
+ fi
+ done < $1
+}
+
+function runPyCaseOneByOne {
+ while read -r line; do
+ if [[ $line =~ ^python.* ]]; then
+ $line 2>&1 | grep 'successfully executed\|failed\|fault' | grep -v 'default'| tee -a pytest-out.log
+ fi
+ done < $1
+}
+
# Color setting
RED='\033[0;31m'
GREEN='\033[1;32m'
@@ -9,10 +26,13 @@ NC='\033[0m'
echo "### run TSIM script ###"
cd script
+
+[ -f out.log ] && rm -f out.log
+
if [ "$1" == "cron" ]; then
- ./test.sh -f fullGeneralSuite.sim 2>&1 | grep 'success\|failed\|fault' | grep -v 'default' | tee out.log
+ runSimCaseOneByOne fullGeneralSuite.sim
else
- ./test.sh -f basicSuite.sim 2>&1 | grep 'success\|failed\|fault' | grep -v 'default' | tee out.log
+ runSimCaseOneByOne basicSuite.sim
fi
totalSuccess=`grep 'success' out.log | wc -l`
@@ -36,10 +56,12 @@ fi
echo "### run Python script ###"
cd ../pytest
+[ -f pytest-out.log ] && rm -f pytest-out.log
+
if [ "$1" == "cron" ]; then
- ./fulltest.sh 2>&1 | grep 'successfully executed\|failed\|fault' | grep -v 'default'| tee pytest-out.log
+ runPyCaseOneByOne fulltest.sh
else
- ./smoketest.sh 2>&1 | grep 'successfully executed\|failed\|fault' | grep -v 'default'| tee pytest-out.log
+ runPyCaseOneByOne smoketest.sh
fi
totalPySuccess=`grep 'successfully executed' pytest-out.log | wc -l`