未验证 提交 c71eae67 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #3051 from taosdata/patch/td-1089

update java subscription interface
......@@ -460,6 +460,49 @@ while(resultSet.next()){
> 查询和操作关系型数据库一致,使用下标获取返回字段内容时从 1 开始,建议使用字段名称获取。
### 订阅
#### 创建
```java
TSDBSubscribe sub = ((TSDBConnection)conn).subscribe("topic", "select * from meters", false);
```
`subscribe` 方法的三个参数含义如下:
* topic:订阅的主题(即名称),此参数是订阅的唯一标识
* sql:订阅的查询语句,此语句只能是 `select` 语句,只应查询原始数据,只能按时间正序查询数据
* restart:如果订阅已经存在,是重新开始,还是继续之前的订阅
如上面的例子将使用 SQL 语句 `select * from meters` 创建一个名为 `topic' 的订阅,如果这个订阅已经存在,将继续之前的查询进度,而不是从头开始消费所有的数据。
#### 消费数据
```java
int total = 0;
while(true) {
TSDBResultSet rs = sub.consume();
int count = 0;
while(rs.next()) {
count++;
}
total += count;
System.out.printf("%d rows consumed, total %d\n", count, total);
Thread.sleep(1000);
}
```
`consume` 方法返回一个结果集,其中包含从上次 `consume` 到目前为止的所有新数据。请务必按需选择合理的调用 `consume` 的频率(如例子中的`Thread.sleep(1000)`),否则会给服务端造成不必要的压力。
#### 关闭订阅
```java
sub.close(true);
```
`close` 方法关闭一个订阅。如果其参数为 `true` 表示保留订阅进度信息,后续可以创建同名订阅继续消费数据;如为 `false` 则不保留订阅进度。
### 关闭资源
```java
......
......@@ -84,12 +84,17 @@ public class TSDBConnection implements Connection {
}
}
public TSDBSubscribe createSubscribe() throws SQLException {
if (!this.connector.isClosed()) {
return new TSDBSubscribe(this.connector);
} else {
public TSDBSubscribe subscribe(String topic, String sql, boolean restart) throws SQLException {
if (this.connector.isClosed()) {
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
}
long id = this.connector.subscribe(topic, sql, restart, 0);
if (id == 0) {
throw new SQLException(TSDBConstants.WrapErrMsg("failed to create subscription"));
}
return new TSDBSubscribe(this.connector, id);
}
public PreparedStatement prepareStatement(String sql) throws SQLException {
......
......@@ -254,29 +254,29 @@ public class TSDBJNIConnector {
private native int closeConnectionImp(long connection);
/**
* Subscribe to a table in TSDB
* Create a subscription
*/
public long subscribe(String topic, String sql, boolean restart, int period) {
long subscribe(String topic, String sql, boolean restart, int period) {
return subscribeImp(this.taos, restart, topic, sql, period);
}
public native long subscribeImp(long connection, boolean restart, String topic, String sql, int period);
private native long subscribeImp(long connection, boolean restart, String topic, String sql, int period);
/**
* Consume a subscribed table
* Consume a subscription
*/
public long consume(long subscription) {
return this.consumeImp(subscription);
long consume(long subscription) {
return this.consumeImp(subscription);
}
private native long consumeImp(long subscription);
/**
* Unsubscribe a table
* Unsubscribe, close a subscription
*
* @param subscription
*/
public void unsubscribe(long subscription, boolean isKeep) {
void unsubscribe(long subscription, boolean isKeep) {
unsubscribeImp(subscription, isKeep);
}
......
......@@ -22,81 +22,28 @@ import java.util.concurrent.*;
public class TSDBSubscribe {
private TSDBJNIConnector connecter = null;
private static ScheduledExecutorService pool;
private static Map<Long, TSDBTimerTask> timerTaskMap = new ConcurrentHashMap<>();
private static Map<Long, ScheduledFuture> scheduledMap = new ConcurrentHashMap();
private long id = 0;
private static class TimerInstance {
private static final ScheduledExecutorService instance = Executors.newScheduledThreadPool(1);
}
public static ScheduledExecutorService getTimerInstance() {
return TimerInstance.instance;
}
public TSDBSubscribe(TSDBJNIConnector connecter) throws SQLException {
TSDBSubscribe(TSDBJNIConnector connecter, long id) throws SQLException {
if (null != connecter) {
this.connecter = connecter;
this.id = id;
} else {
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
}
}
/**
* sync subscribe
*
* @param topic
* @param sql
* @param restart
* @param period
* @throws SQLException
*/
public long subscribe(String topic, String sql, boolean restart, int period) throws SQLException {
if (this.connecter.isClosed()) {
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
}
if (period < 1000) {
throw new SQLException(TSDBConstants.WrapErrMsg(TSDBConstants.INVALID_VARIABLES));
}
return this.connecter.subscribe(topic, sql, restart, period);
}
/**
* async subscribe
* consume
*
* @param topic
* @param sql
* @param restart
* @param period
* @param callBack
* @throws SQLException
* @throws OperationsException, SQLException
*/
public long subscribe(String topic, String sql, boolean restart, int period, TSDBSubscribeCallBack callBack) throws SQLException {
public TSDBResultSet consume() throws OperationsException, SQLException {
if (this.connecter.isClosed()) {
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
}
final long subscription = this.connecter.subscribe(topic, sql, restart, period);
if (null != callBack) {
pool = getTimerInstance();
TSDBTimerTask timerTask = new TSDBTimerTask(subscription, callBack);
timerTaskMap.put(subscription, timerTask);
ScheduledFuture scheduledFuture = pool.scheduleAtFixedRate(timerTask, 1, 1000, TimeUnit.MILLISECONDS);
scheduledMap.put(subscription, scheduledFuture);
}
return subscription;
}
public TSDBResultSet consume(long subscription) throws OperationsException, SQLException {
if (this.connecter.isClosed()) {
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
}
if (0 == subscription) {
throw new OperationsException("Invalid use of consume");
}
long resultSetPointer = this.connecter.consume(subscription);
long resultSetPointer = this.connecter.consume(this.id);
if (resultSetPointer == TSDBConstants.JNI_CONNECTION_NULL) {
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
......@@ -108,77 +55,16 @@ public class TSDBSubscribe {
}
/**
* cancel subscribe
* close subscription
*
* @param subscription
* @param isKeep
* @param keepProgress
* @throws SQLException
*/
public void unsubscribe(long subscription, boolean isKeep) throws SQLException {
public void close(boolean keepProgress) throws SQLException {
if (this.connecter.isClosed()) {
throw new SQLException(TSDBConstants.FixErrMsg(TSDBConstants.JNI_CONNECTION_NULL));
}
if (null != timerTaskMap.get(subscription)) {
synchronized (timerTaskMap.get(subscription)) {
while (1 == timerTaskMap.get(subscription).getState()) {
try {
Thread.sleep(10);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
timerTaskMap.get(subscription).setState(2);
if (!timerTaskMap.isEmpty() && timerTaskMap.containsKey(subscription)) {
timerTaskMap.get(subscription).cancel();
timerTaskMap.remove(subscription);
scheduledMap.get(subscription).cancel(false);
scheduledMap.remove(subscription);
}
this.connecter.unsubscribe(subscription, isKeep);
}
} else {
this.connecter.unsubscribe(subscription, isKeep);
}
}
class TSDBTimerTask extends TimerTask {
private long subscription;
private TSDBSubscribeCallBack callBack;
// 0: not running 1: running 2: cancel
private int state = 0;
public TSDBTimerTask(long subscription, TSDBSubscribeCallBack callBack) {
this.subscription = subscription;
this.callBack = callBack;
}
public int getState() {
return this.state;
}
public void setState(int state) {
this.state = state;
}
@Override
public void run() {
synchronized (this) {
if (2 == state) {
return;
}
state = 1;
try {
callBack.invoke(consume(subscription));
} catch (Exception e) {
this.cancel();
throw new RuntimeException(e);
}
state = 0;
}
}
this.connecter.unsubscribe(this.id, keepProgress);
}
}
import com.taosdata.jdbc.*;
import org.apache.commons.lang3.StringUtils;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Properties;
public class TestAsyncTSDBSubscribe {
public static void main(String[] args) throws SQLException {
String usage = "java -cp taos-jdbcdriver-2.0.0_dev-dist.jar com.taosdata.jdbc.TSDBSubscribe -db dbName -topic topicName " +
"-tname tableName -h host";
if (args.length < 2) {
System.err.println(usage);
return;
}
String dbName = "";
String tName = "";
String host = "localhost";
String topic = "";
for (int i = 0; i < args.length; i++) {
if ("-db".equalsIgnoreCase(args[i]) && i < args.length - 1) {
dbName = args[++i];
}
if ("-tname".equalsIgnoreCase(args[i]) && i < args.length - 1) {
tName = args[++i];
}
if ("-h".equalsIgnoreCase(args[i]) && i < args.length - 1) {
host = args[++i];
}
if ("-topic".equalsIgnoreCase(args[i]) && i < args.length - 1) {
topic = args[++i];
}
}
if (StringUtils.isEmpty(dbName) || StringUtils.isEmpty(tName) || StringUtils.isEmpty(topic)) {
System.err.println(usage);
return;
}
Connection connection = null;
long subscribId = 0;
try {
Class.forName("com.taosdata.jdbc.TSDBDriver");
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_HOST, host);
properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
connection = DriverManager.getConnection("jdbc:TAOS://" + host + ":0/" + dbName + "?user=root&password=taosdata", properties);
String rawSql = "select * from " + tName + ";";
TSDBSubscribe subscribe = ((TSDBConnection) connection).createSubscribe();
subscribId = subscribe.subscribe(topic, rawSql, false, 1000, new CallBack("first"));
long subscribId2 = subscribe.subscribe("test", rawSql, false, 1000, new CallBack("second"));
int a = 0;
Thread.sleep(2000);
subscribe.unsubscribe(subscribId, true);
System.err.println("cancel subscribe");
} catch (Exception e) {
e.printStackTrace();
if (null != connection && !connection.isClosed()) {
connection.close();
}
}
}
private static class CallBack implements TSDBSubscribeCallBack {
private String name = "";
public CallBack(String name) {
this.name = name;
}
@Override
public void invoke(TSDBResultSet resultSet) {
try {
while (null !=resultSet && resultSet.next()) {
System.out.print("callback_" + name + ": ");
for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) {
System.out.printf(i + ": " + resultSet.getString(i) + "\t");
}
System.out.println();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
......@@ -2,82 +2,76 @@ import com.taosdata.jdbc.TSDBConnection;
import com.taosdata.jdbc.TSDBDriver;
import com.taosdata.jdbc.TSDBResultSet;
import com.taosdata.jdbc.TSDBSubscribe;
import org.apache.commons.lang3.StringUtils;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.Properties;
public class TestTSDBSubscribe {
public static TSDBConnection connectTDengine(String host, String database) throws Exception {
Class.forName("com.taosdata.jdbc.TSDBDriver");
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_HOST, host);
properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
String cs = String.format("jdbc:TAOS://%s:0/%s?user=root&password=taosdata", host, database);
return (TSDBConnection)DriverManager.getConnection(cs, properties);
}
public static void main(String[] args) throws Exception {
String usage = "java -cp taos-jdbcdriver-2.0.0_dev-dist.jar com.taosdata.jdbc.TSDBSubscribe -db dbName " +
"-topic topicName -tname tableName -h host";
String usage = "java -Djava.ext.dirs=../ TestTSDBSubscribe [-host host] <-db database> <-topic topic> <-sql sql>";
if (args.length < 2) {
System.err.println(usage);
return;
}
String dbName = "";
String tName = "";
String host = "localhost";
String topic = "";
String host = "localhost", database = "", topic = "", sql = "";
for (int i = 0; i < args.length; i++) {
if ("-db".equalsIgnoreCase(args[i]) && i < args.length - 1) {
dbName = args[++i];
database = args[++i];
}
if ("-tname".equalsIgnoreCase(args[i]) && i < args.length - 1) {
tName = args[++i];
if ("-topic".equalsIgnoreCase(args[i]) && i < args.length - 1) {
topic = args[++i];
}
if ("-h".equalsIgnoreCase(args[i]) && i < args.length - 1) {
if ("-host".equalsIgnoreCase(args[i]) && i < args.length - 1) {
host = args[++i];
}
if ("-topic".equalsIgnoreCase(args[i]) && i < args.length - 1) {
topic = args[++i];
if ("-sql".equalsIgnoreCase(args[i]) && i < args.length - 1) {
sql = args[++i];
}
}
if (StringUtils.isEmpty(dbName) || StringUtils.isEmpty(tName) || StringUtils.isEmpty(topic)) {
System.err.println(usage);
return;
if (database.isEmpty() || topic.isEmpty() || sql.isEmpty()) {
System.err.println(usage);
return;
}
Connection connection = null;
TSDBSubscribe subscribe = null;
long subscribId = 0;
TSDBConnection connection = null;
TSDBSubscribe sub = null;
try {
Class.forName("com.taosdata.jdbc.TSDBDriver");
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_HOST, host);
properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
connection = DriverManager.getConnection("jdbc:TAOS://" + host + ":0/" + dbName + "?user=root&password=taosdata"
, properties);
String rawSql = "select * from " + tName + ";";
subscribe = ((TSDBConnection) connection).createSubscribe();
subscribId = subscribe.subscribe(topic, rawSql, false, 1000);
int a = 0;
TSDBResultSet resSet = null;
while (true) {
Thread.sleep(900);
resSet = subscribe.consume(subscribId);
connection = connectTDengine(host, database);
sub = ((TSDBConnection) connection).subscribe(topic, sql, false);
while (resSet.next()) {
for (int i = 1; i <= resSet.getMetaData().getColumnCount(); i++) {
System.out.printf(i + ": " + resSet.getString(i) + "\t");
}
System.out.println("\n======" + a + "==========");
}
a++;
if (a >= 10) {
break;
int total = 0;
while(true) {
TSDBResultSet rs = sub.consume();
int count = 0;
while(rs.next()) {
count++;
}
total += count;
System.out.printf("%d rows consumed, total %d\n", count, total);
Thread.sleep(900);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != subscribe && 0 != subscribId) {
subscribe.unsubscribe(subscribId, true);
if (null != sub) {
sub.close(true);
}
if (null != connection) {
connection.close();
......
package com.taosdata.jdbc;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;
import static org.junit.Assert.assertTrue;
public class AsyncSubscribeTest extends BaseTest {
Connection connection = null;
Statement statement = null;
String dbName = "test";
String tName = "t0";
String host = "localhost";
String topic = "test";
long subscribId = 0;
@Before
public void createDatabase() throws SQLException {
try {
Class.forName("com.taosdata.jdbc.TSDBDriver");
} catch (ClassNotFoundException e) {
return;
}
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_HOST, host);
properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
connection = DriverManager.getConnection("jdbc:TAOS://" + host + ":0/" + "?user=root&password=taosdata"
, properties);
statement = connection.createStatement();
statement.executeUpdate("create database if not exists " + dbName);
statement.executeUpdate("create table if not exists " + dbName + "." + tName + " (ts timestamp, k int, v int)");
long ts = System.currentTimeMillis();
for (int i = 0; i < 2; i++) {
ts += i;
String sql = "insert into " + dbName + "." + tName + " values (" + ts + ", " + (100 + i) + ", " + i + ")";
statement.executeUpdate(sql);
}
}
@Test
public void subscribe() throws Exception {
TSDBSubscribe subscribe = null;
try {
String rawSql = "select * from " + dbName + "." + tName + ";";
System.out.println(rawSql);
subscribe = ((TSDBConnection) connection).createSubscribe();
subscribId = subscribe.subscribe(topic, rawSql, false, 1000, new CallBack("first"));
assertTrue(subscribId > 0);
} catch (Exception e) {
e.printStackTrace();
}
Thread.sleep(2000);
subscribe.unsubscribe(subscribId, true);
}
private static class CallBack implements TSDBSubscribeCallBack {
private String name = "";
public CallBack(String name) {
this.name = name;
}
@Override
public void invoke(TSDBResultSet resultSet) {
try {
while (null != resultSet && resultSet.next()) {
System.out.print("callback_" + name + ": ");
for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) {
System.out.printf(i + ": " + resultSet.getString(i) + "\t");
}
System.out.println();
}
resultSet.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
@After
public void close() throws Exception {
statement.executeQuery("drop database test");
statement.close();
connection.close();
Thread.sleep(10);
}
}
\ No newline at end of file
......@@ -49,20 +49,16 @@ public class SubscribeTest extends BaseTest {
@Test
public void subscribe() throws Exception {
TSDBSubscribe subscribe = null;
long subscribId = 0;
try {
String rawSql = "select * from " + dbName + "." + tName + ";";
System.out.println(rawSql);
subscribe = ((TSDBConnection) connection).createSubscribe();
subscribId = subscribe.subscribe(topic, rawSql, false, 1000);
assertTrue(subscribId > 0);
subscribe = ((TSDBConnection) connection).subscribe(topic, rawSql, false);
int a = 0;
while (true) {
Thread.sleep(900);
TSDBResultSet resSet = subscribe.consume(subscribId);
TSDBResultSet resSet = subscribe.consume();
while (resSet.next()) {
for (int i = 1; i <= resSet.getMetaData().getColumnCount(); i++) {
......@@ -79,8 +75,8 @@ public class SubscribeTest extends BaseTest {
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != subscribe && 0 != subscribId) {
subscribe.unsubscribe(subscribId, true);
if (null != subscribe) {
subscribe.close(true);
}
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册