Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
29d0beb3
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1185
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
29d0beb3
编写于
8月 09, 2022
作者:
W
wade zhang
提交者:
GitHub
8月 09, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #15856 from taosdata/docs/TD-17892
docs(driver): jdbc add tmq sample code
上级
57e16a27
c07097ab
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
104 addition
and
32 deletion
+104
-32
docs/zh/14-reference/03-connector/java.mdx
docs/zh/14-reference/03-connector/java.mdx
+97
-25
source/client/src/TSDBJNIConnector.c
source/client/src/TSDBJNIConnector.c
+7
-7
未找到文件。
docs/zh/14-reference/03-connector/java.mdx
浏览文件 @
29d0beb3
...
...
@@ -670,55 +670,127 @@ public class SchemalessInsertTest {
TDengine Java 连接器支持订阅功能,应用 API 如下:
#### 创建
订阅
#### 创建
Topic
```java
TSDBSubscribe sub = ((TSDBConnection)conn).subscribe("topic", "select * from meters", false);
Connection connection = DriverManager.getConnection(url, properties);
Statement statement = connection.createStatement();
statement.executeUpdate("create topic if not exists topic_speed as select ts, speed from speed_table");
```
`subscribe` 方法的三个参数含义如下:
- topic:订阅的主题(即名称),此参数是订阅的唯一标识
- sql:订阅的查询语句,此语句只能是 `select` 语句,只应查询原始数据,只能按时间正序查询数据
- restart:如果订阅已经存在,是重新开始,还是继续之前的订阅
- topic_speed:订阅的主题(即名称),此参数是订阅的唯一标识。
- sql:订阅的查询语句,此语句只能是 `select` 语句,只应查询原始数据,只能按时间正序查询数据。
如上面的例子将使用 SQL 语句 `select * from meters` 创建一个名为 `topic` 的订阅,如果这个订阅已经存在,将继续之前的查询进度,而不是从头开始消费所有的数据。
如上面的例子将使用 SQL 语句 `select ts, speed from speed_table` 创建一个名为 `topic_speed` 的订阅。
#### 创建 Consumer
```java
Properties config = new Properties();
config.setProperty("enable.auto.commit", "true");
config.setProperty("group.id", "group1");
config.setProperty("value.deserializer", "com.taosdata.jdbc.tmq.ConsumerTest.ResultDeserializer");
TaosConsumer consumer = new TaosConsumer<>(config);
```
- enable.auto.commit: 是否允许自动提交。
- group.id: consumer: 所在的 group。
- value.deserializer: 结果集反序列化方法,可以继承 `com.taosdata.jdbc.tmq.ReferenceDeserializer`,并指定结果集 bean,实现反序列化。也可以继承 `com.taosdata.jdbc.tmq.Deserializer`,根据 SQL 的 resultSet 自定义反序列化方式。
- 其他参数请参考:[Consumer 参数列表](/develop/tmq#创建-consumer-以及consumer-group)
#### 订阅消费数据
```java
int total = 0;
while(true) {
TSDBResultSet rs = sub.consume();
int count = 0;
while(rs.next()) {
count++;
}
total += count;
System.out.printf("%d rows consumed, total %d\n", count, total);
Thread.sleep(1000);
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ResultBean record : records) {
process(record);
}
}
```
`
consume` 方法返回一个结果集,其中包含从上次 `consume` 到目前为止的所有新数据。请务必按需选择合理的调用 `consume` 的频率(如例子中的 `Thread.sleep(10
00)`),否则会给服务端造成不必要的压力。
`
poll` 方法返回一个结果集,其中包含从上次 `poll` 到目前为止的所有新数据。请务必按需选择合理的调用 `poll` 的频率(如例子中的 `Duration.ofMillis(1
00)`),否则会给服务端造成不必要的压力。
#### 关闭订阅
```java
sub.close(true);
consumer.close()
```
`close` 方法关闭一个订阅。如果其参数为 `true` 表示保留订阅进度信息,后续可以创建同名订阅继续消费数据;如为 `false` 则不保留订阅进度。
### 关闭资源
### 使用示例如下:
```java
resultSet.close();
stmt.close();
conn.close();
```
public abstract class ConsumerLoop {
private final TaosConsumer<ResultBean> consumer;
private final List<String> topics;
private final AtomicBoolean shutdown;
private final CountDownLatch shutdownLatch;
public ConsumerLoop() throws SQLException {
Properties config = new Properties();
config.setProperty("msg.with.table.name", "true");
config.setProperty("enable.auto.commit", "true");
config.setProperty("group.id", "group1");
config.setProperty("value.deserializer", "com.taosdata.jdbc.tmq.ConsumerTest.ResultDeserializer");
this.consumer = new TaosConsumer<>(config);
this.topics = Collections.singletonList("topic_speed");
this.shutdown = new AtomicBoolean(false);
this.shutdownLatch = new CountDownLatch(1);
}
public abstract void process(ResultBean result);
public void pollData() throws SQLException {
try {
consumer.subscribe(topics);
while (!shutdown.get()) {
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ResultBean record : records) {
process(record);
}
}
} finally {
consumer.close();
shutdownLatch.countDown();
}
}
public void shutdown() throws InterruptedException {
shutdown.set(true);
shutdownLatch.await();
}
static class ResultDeserializer extends ReferenceDeserializer<ResultBean> {
}
static class ResultBean {
private Timestamp ts;
private int speed;
public Timestamp getTs() {
return ts;
}
> `注意务必要将 connection 进行关闭`,否则会出现连接泄露。
public void setTs(Timestamp ts) {
this.ts = ts;
}
public int getSpeed() {
return speed;
}
public void setSpeed(int speed) {
this.speed = speed;
}
}
}
```
### 与连接池使用
...
...
source/client/src/TSDBJNIConnector.c
浏览文件 @
29d0beb3
...
...
@@ -757,7 +757,7 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_prepareStmtImp(J
int32_t
code
=
taos_stmt_prepare
(
pStmt
,
str
,
len
);
taosMemoryFreeClear
(
str
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
jniError
(
"jobj:%p, conn:%p, code:%s"
,
jobj
,
tscon
,
tstrerror
(
code
));
jniError
(
"
prepareStmt
jobj:%p, conn:%p, code:%s"
,
jobj
,
tscon
,
tstrerror
(
code
));
return
JNI_TDENGINE_ERROR
;
}
...
...
@@ -785,7 +785,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setBindTableNameI
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
(
*
env
)
->
ReleaseStringUTFChars
(
env
,
jname
,
name
);
jniError
(
"jobj:%p, conn:%p, code:%s"
,
jobj
,
tsconn
,
tstrerror
(
code
));
jniError
(
"
bindTableName
jobj:%p, conn:%p, code:%s"
,
jobj
,
tsconn
,
tstrerror
(
code
));
return
JNI_TDENGINE_ERROR
;
}
...
...
@@ -860,7 +860,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setTableNameTagsI
(
*
env
)
->
ReleaseStringUTFChars
(
env
,
tableName
,
name
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
jniError
(
"jobj:%p, conn:%p, code:%s"
,
jobj
,
tsconn
,
tstrerror
(
code
));
jniError
(
"
tableNameTags
jobj:%p, conn:%p, code:%s"
,
jobj
,
tsconn
,
tstrerror
(
code
));
return
JNI_TDENGINE_ERROR
;
}
return
JNI_SUCCESS
;
...
...
@@ -926,7 +926,7 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp(
taosMemoryFreeClear
(
b
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
jniError
(
"jobj:%p, conn:%p, code:%s"
,
jobj
,
tscon
,
tstrerror
(
code
));
jniError
(
"
bindColData
jobj:%p, conn:%p, code:%s"
,
jobj
,
tscon
,
tstrerror
(
code
));
return
JNI_TDENGINE_ERROR
;
}
...
...
@@ -949,7 +949,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_addBatchImp(JNIEn
int32_t
code
=
taos_stmt_add_batch
(
pStmt
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
jniError
(
"jobj:%p, conn:%p, code:%s"
,
jobj
,
tscon
,
tstrerror
(
code
));
jniError
(
"
add batch
jobj:%p, conn:%p, code:%s"
,
jobj
,
tscon
,
tstrerror
(
code
));
return
JNI_TDENGINE_ERROR
;
}
...
...
@@ -973,7 +973,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeBatchImp(J
int32_t
code
=
taos_stmt_execute
(
pStmt
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
jniError
(
"jobj:%p, conn:%p, code:%s"
,
jobj
,
tscon
,
tstrerror
(
code
));
jniError
(
"
excute batch
jobj:%p, conn:%p, code:%s"
,
jobj
,
tscon
,
tstrerror
(
code
));
return
JNI_TDENGINE_ERROR
;
}
...
...
@@ -997,7 +997,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeStmt(JNIEnv
int32_t
code
=
taos_stmt_close
(
pStmt
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
jniError
(
"jobj:%p, conn:%p, code:%s"
,
jobj
,
tscon
,
tstrerror
(
code
));
jniError
(
"
close stmt
jobj:%p, conn:%p, code:%s"
,
jobj
,
tscon
,
tstrerror
(
code
));
return
JNI_TDENGINE_ERROR
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录