Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
d0f420d4
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1193
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
d0f420d4
编写于
6月 08, 2022
作者:
M
Minglei Jin
提交者:
GitHub
6月 08, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #12055 from taosdata/test/TS-1472
test: demo for produce message to TQ and consume message from TQ
上级
425b831e
da969b44
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
117 addition
and
1 deletion
+117
-1
examples/JDBC/JDBCDemo/pom.xml
examples/JDBC/JDBCDemo/pom.xml
+1
-1
examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/TQDemo.java
...C/JDBCDemo/src/main/java/com/taosdata/example/TQDemo.java
+116
-0
未找到文件。
examples/JDBC/JDBCDemo/pom.xml
浏览文件 @
d0f420d4
...
@@ -17,7 +17,7 @@
...
@@ -17,7 +17,7 @@
<dependency>
<dependency>
<groupId>
com.taosdata.jdbc
</groupId>
<groupId>
com.taosdata.jdbc
</groupId>
<artifactId>
taos-jdbcdriver
</artifactId>
<artifactId>
taos-jdbcdriver
</artifactId>
<version>
2.0.3
6
</version>
<version>
2.0.3
8
</version>
</dependency>
</dependency>
</dependencies>
</dependencies>
...
...
examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/TQDemo.java
0 → 100644
浏览文件 @
d0f420d4
package
com.taosdata.example
;
import
com.taosdata.jdbc.TSDBConnection
;
import
com.taosdata.jdbc.TSDBResultSet
;
import
com.taosdata.jdbc.TSDBSubscribe
;
import
java.sql.*
;
import
java.text.SimpleDateFormat
;
import
java.util.Date
;
import
java.util.concurrent.TimeUnit
;
public
class
TQDemo
{
private
static
final
String
host
=
"192.168.56.105"
;
private
static
final
String
topic
=
"test_tq"
;
public
static
void
main
(
String
[]
args
)
{
try
(
Connection
conn
=
getConnection
())
{
createTopic
(
conn
);
Thread
producer
=
new
Thread
(
new
Producer
(
conn
,
topic
,
1
));
producer
.
start
();
Thread
consumer
=
new
Thread
(
new
Consumer
(
conn
,
topic
,
1
),
"Consumer"
);
consumer
.
start
();
producer
.
join
();
consumer
.
join
();
}
catch
(
SQLException
|
InterruptedException
e
)
{
e
.
printStackTrace
();
}
}
private
static
Connection
getConnection
()
throws
SQLException
{
final
String
jdbcUrl
=
"jdbc:TAOS://"
+
host
+
":6030/"
;
return
DriverManager
.
getConnection
(
jdbcUrl
,
"root"
,
"taosdata"
);
}
private
static
void
createTopic
(
Connection
conn
)
throws
SQLException
{
try
(
Statement
stmt
=
conn
.
createStatement
())
{
stmt
.
execute
(
"drop topic if exists "
+
topic
);
stmt
.
execute
(
"create topic if not exists "
+
topic
+
" partitions 1"
);
}
}
private
static
class
Producer
implements
Runnable
{
private
final
Connection
conn
;
private
final
String
topic
;
private
final
int
partitionIndex
;
private
Producer
(
Connection
conn
,
String
topic
,
int
partitionIndex
)
{
this
.
conn
=
conn
;
this
.
topic
=
topic
;
this
.
partitionIndex
=
partitionIndex
;
}
@Override
public
void
run
()
{
try
(
Statement
stmt
=
conn
.
createStatement
())
{
for
(
int
i
=
0
;
i
<
10
;
i
++)
{
stmt
.
execute
(
"insert into "
+
topic
+
".p"
+
partitionIndex
+
" (off, ts, content) values(0, now, 'abcdefg')"
);
TimeUnit
.
SECONDS
.
sleep
(
1
);
}
}
catch
(
SQLException
|
InterruptedException
e
)
{
e
.
printStackTrace
();
}
}
}
private
static
class
Consumer
implements
Runnable
{
private
static
final
SimpleDateFormat
formator
=
new
SimpleDateFormat
(
"yyyy-MM-dd HH:mm:ss.SSS"
);
private
final
Connection
conn
;
private
final
String
topic
;
private
final
int
partitionIndex
;
private
Consumer
(
Connection
conn
,
String
topic
,
int
partitionIndex
)
{
this
.
conn
=
conn
;
this
.
topic
=
topic
;
this
.
partitionIndex
=
partitionIndex
;
}
@Override
public
void
run
()
{
try
{
System
.
out
.
println
(
Thread
.
currentThread
().
getName
()
+
" started"
);
TSDBConnection
tsdbConn
=
conn
.
unwrap
(
TSDBConnection
.
class
);
final
String
sql
=
"select * from "
+
topic
+
".p"
+
partitionIndex
;
TSDBSubscribe
subscribe
=
tsdbConn
.
subscribe
(
topic
,
sql
,
false
);
for
(
int
count
=
0
;
true
;
)
{
TSDBResultSet
rs
=
subscribe
.
consume
();
while
(
rs
.
next
())
{
long
offset
=
rs
.
getLong
(
"off"
);
Timestamp
ts
=
rs
.
getTimestamp
(
"ts"
);
String
content
=
rs
.
getString
(
"content"
);
System
.
out
.
println
(
Thread
.
currentThread
().
getName
()
+
" >>> offset: "
+
offset
+
", ts: "
+
formator
.
format
(
new
Date
(
ts
.
getTime
()))
+
", content: "
+
content
);
count
++;
}
if
(
count
==
10
)
break
;
}
System
.
out
.
println
(
Thread
.
currentThread
().
getName
()
+
" stopped"
);
}
catch
(
SQLException
e
)
{
e
.
printStackTrace
();
}
}
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录