Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
392cd9f7
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
392cd9f7
编写于
8月 12, 2022
作者:
P
plum-lihui
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
docs:modify c example code
上级
de225e6f
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
285 addition
and
55 deletion
+285
-55
docs/zh/07-develop/07-tmq.md
docs/zh/07-develop/07-tmq.md
+285
-55
未找到文件。
docs/zh/07-develop/07-tmq.md
浏览文件 @
392cd9f7
...
...
@@ -54,7 +54,7 @@ DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf);
DLL_EXPORT
void
tmq_conf_set_auto_commit_cb
(
tmq_conf_t
*
conf
,
tmq_commit_cb
*
cb
,
void
*
param
);
```
这些 API 的文档请见
[
C/C++ Connector
](
/reference/connector/cpp
)
,下面介绍一下它们的具体用法(超级表和子表结构请参考“数据建模”一节),完整的示例代码
可以在
[
tmq.c
](
https://github.com/taosdata/TDengine/blob/3.0/examples/c/tmq.c
)
看到
。
这些 API 的文档请见
[
C/C++ Connector
](
/reference/connector/cpp
)
,下面介绍一下它们的具体用法(超级表和子表结构请参考“数据建模”一节),完整的示例代码
请见下面C语言的示例代码
。
## 写入数据
...
...
@@ -65,13 +65,9 @@ drop database if exists tmqdb;
create
database
tmqdb
;
create
table
tmqdb
.
stb
(
ts
timestamp
,
c1
int
,
c2
float
,
c3
varchar
(
16
)
tags
(
t1
int
,
t3
varchar
(
16
));
create
table
tmqdb
.
ctb0
using
tmqdb
.
stb
tags
(
0
,
"subtable0"
);
create
table
tmqdb
.
ctb1
using
tmqdb
.
stb
tags
(
1
,
"subtable1"
);
create
table
tmqdb
.
ctb2
using
tmqdb
.
stb
tags
(
2
,
"subtable2"
);
create
table
tmqdb
.
ctb3
using
tmqdb
.
stb
tags
(
3
,
"subtable3"
);
create
table
tmqdb
.
ctb1
using
tmqdb
.
stb
tags
(
1
,
"subtable1"
);
insert
into
tmqdb
.
ctb0
values
(
now
,
0
,
0
,
'a0'
)(
now
+
1
s
,
0
,
0
,
'a00'
);
insert
into
tmqdb
.
ctb1
values
(
now
,
1
,
1
,
'a1'
)(
now
+
1
s
,
11
,
11
,
'a11'
);
insert
into
tmqdb
.
ctb2
values
(
now
,
2
,
2
,
'a1'
)(
now
+
1
s
,
22
,
22
,
'a22'
);
insert
into
tmqdb
.
ctb3
values
(
now
,
3
,
3
,
'a1'
)(
now
+
1
s
,
33
,
33
,
'a33'
);
```
## 创建topic:
...
...
@@ -133,7 +129,6 @@ TMQ支持多种订阅类型:
tmq_t
*
tmq
=
tmq_consumer_new
(
conf
,
NULL
,
0
);
tmq_conf_destroy
(
conf
);
return
tmq
;
```
上述配置中包括consumer group ID,如果多个 consumer 指定的 consumer group ID一样,则自动形成一个consumer group,共享消费进度。
...
...
@@ -146,66 +141,23 @@ TMQ支持多种订阅类型:
```
sql
tmq_list_t
*
topicList
=
tmq_list_new
();
tmq_list_append
(
topicList
,
"topicName"
);
return
topicList
;
```
## 启动订阅并开始消费
```
sql
```
/* 启动订阅 */
tmq_subscribe(tmq, topicList);
tmq_list_destroy(topicList);
/* 循环poll消息 */
int32_t
totalRows
=
0
;
int32_t
msgCnt
=
0
;
int32_t
timeOut
=
5000
;
while (running) {
TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeOut);
if
(
tmqmsg
)
{
msgCnt
++
;
totalRows
+=
msg_process
(
tmqmsg
);
taos_free_result
(
tmqmsg
);
}
else
{
break
;
}
}
fprintf
(
stderr
,
"%d msg consumed, include %d rows
\n
"
,
msgCnt
,
totalRows
);
msg_process(tmqmsg);
}
```
这里是一个
**while**
循环,每调用一次tmq_consumer_poll(),获取一个消息,该消息与普通查询返回的结果集完全相同,可以使用相同的解析API完成消息内容的解析:
```
sql
static
int32_t
msg_process
(
TAOS_RES
*
msg
)
{
char
buf
[
1024
];
int32_t
rows
=
0
;
const
char
*
topicName
=
tmq_get_topic_name
(
msg
);
const
char
*
dbName
=
tmq_get_db_name
(
msg
);
int32_t
vgroupId
=
tmq_get_vgroup_id
(
msg
);
printf
(
"topic: %s
\n
"
,
topicName
);
printf
(
"db: %s
\n
"
,
dbName
);
printf
(
"vgroup id: %d
\n
"
,
vgroupId
);
while
(
1
)
{
TAOS_ROW
row
=
taos_fetch_row
(
msg
);
if
(
row
==
NULL
)
break
;
TAOS_FIELD
*
fields
=
taos_fetch_fields
(
msg
);
int32_t
numOfFields
=
taos_field_count
(
msg
);
int32_t
*
length
=
taos_fetch_lengths
(
msg
);
int32_t
precision
=
taos_result_precision
(
msg
);
const
char
*
tbName
=
tmq_get_table_name
(
msg
);
rows
++
;
taos_print_row
(
buf
,
row
,
fields
,
numOfFields
);
printf
(
"row content from %s: %s
\n
"
,
(
tbName
!=
NULL
?
tbName
:
"null table"
),
buf
);
}
return
rows
;
}
```
这里是一个
**while**
循环,每调用一次tmq_consumer_poll(),获取一个消息,该消息与普通查询返回的结果集完全相同,可以使用相同的解析API完成消息内容的解析。
## 结束消费
...
...
@@ -252,7 +204,285 @@ TMQ支持多种订阅类型:
<Tabs>
<TabItem
label=
"C"
value=
"c"
>
TODO
// A simple demo for asynchronous subscription.
// compile with:
// gcc -o tmq tmq.c -ltaos
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include "taos.h"
static int running = 1;
static char dbName[64] = "tmqdb";
static char stbName[64] = "stb";
static char topicName[64] = "topicname";
static int32_t msg_process(TAOS_RES
*
msg) {
char buf[1024];
int32_t rows = 0;
const char
*
topicName = tmq_get_topic_name(msg);
const char
*
dbName = tmq_get_db_name(msg);
int32_t vgroupId = tmq_get_vgroup_id(msg);
printf("topic: %s
\n
", topicName);
printf("db: %s
\n
", dbName);
printf("vgroup id: %d
\n
", vgroupId);
while (1) {
TAOS_ROW row = taos_fetch_row(msg);
if (row == NULL) break;
TAOS_FIELD
*
fields = taos_fetch_fields(msg);
int32_t numOfFields = taos_field_count(msg);
int32_t
*
length = taos_fetch_lengths(msg);
int32_t precision = taos_result_precision(msg);
const char
*
tbName = tmq_get_table_name(msg);
rows++;
taos_print_row(buf, row, fields, numOfFields);
printf("row content from %s: %s
\n
", (tbName != NULL ? tbName : "table null"), buf);
}
return rows;
}
static int32_t init_env() {
TAOS
*
pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) {
return -1;
}
TAOS_RES
*
pRes;
// drop database if exists
printf("create database
\n
");
pRes = taos_query(pConn, "drop database if exists tmqdb");
if (taos_errno(pRes) != 0) {
printf("error in drop tmqdb, reason:%s
\n
", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
// create database
pRes = taos_query(pConn, "create database tmqdb");
if (taos_errno(pRes) != 0) {
printf("error in create tmqdb, reason:%s
\n
", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
// create super table
printf("create super table
\n
");
pRes = taos_query(
pConn, "create table tmqdb.stb (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))");
if (taos_errno(pRes) != 0) {
printf("failed to create super table stb, reason:%s
\n
", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
// create sub tables
printf("create sub tables
\n
");
pRes = taos_query(pConn, "create table tmqdb.ctb0 using tmqdb.stb tags(0, 'subtable0')");
if (taos_errno(pRes) != 0) {
printf("failed to create super table ctb0, reason:%s
\n
", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table tmqdb.ctb1 using tmqdb.stb tags(1, 'subtable1')");
if (taos_errno(pRes) != 0) {
printf("failed to create super table ctb1, reason:%s
\n
", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table tmqdb.ctb2 using tmqdb.stb tags(2, 'subtable2')");
if (taos_errno(pRes) != 0) {
printf("failed to create super table ctb2, reason:%s
\n
", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table tmqdb.ctb3 using tmqdb.stb tags(3, 'subtable3')");
if (taos_errno(pRes) != 0) {
printf("failed to create super table ctb3, reason:%s
\n
", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
// insert data
printf("insert data into sub tables
\n
");
pRes = taos_query(pConn, "insert into tmqdb.ctb0 values(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00')");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ctb0, reason:%s
\n
", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into tmqdb.ctb1 values(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11')");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ctb0, reason:%s
\n
", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into tmqdb.ctb2 values(now, 2, 2, 'a1')(now+1s, 22, 22, 'a22')");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ctb0, reason:%s
\n
", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into tmqdb.ctb3 values(now, 3, 3, 'a1')(now+1s, 33, 33, 'a33')");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ctb0, reason:%s
\n
", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
taos_close(pConn);
return 0;
}
int32_t create_topic() {
printf("create topic
\n
");
TAOS_RES
*
pRes;
TAOS
*
pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) {
return -1;
}
pRes = taos_query(pConn, "use tmqdb");
if (taos_errno(pRes) != 0) {
printf("error in use tmqdb, reason:%s
\n
", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create topic topicname as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1");
if (taos_errno(pRes) != 0) {
printf("failed to create topic topicname, reason:%s
\n
", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
taos_close(pConn);
return 0;
}
void tmq_commit_cb_print(tmq_t
* tmq, int32_t code, void*
param) {
printf("tmq_commit_cb_print() code: %d, tmq: %p, param: %p
\n
", code, tmq, param);
}
tmq_t
*
build_consumer() {
tmq_conf_res_t code;
tmq_conf_t
*
conf = tmq_conf_new();
code = tmq_conf_set(conf, "enable.auto.commit", "true");
if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "group.id", "cgrpName");
if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "client.id", "user defined name");
if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "td.connect.user", "root");
if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "td.connect.pass", "taosdata");
if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "auto.offset.reset", "earliest");
if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "experimental.snapshot.enable", "true");
if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "msg.with.table.name", "true");
if (TMQ_CONF_OK != code) return NULL;
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_t
*
tmq = tmq_consumer_new(conf, NULL, 0);
tmq_conf_destroy(conf);
return tmq;
}
tmq_list_t
*
build_topic_list() {
tmq_list_t
*
topicList = tmq_list_new();
int32_t code = tmq_list_append(topicList, "topicname");
if (code) {
return NULL;
}
return topicList;
}
void basic_consume_loop(tmq_t
* tmq, tmq_list_t*
topicList) {
int32_t code;
if ((code = tmq_subscribe(tmq, topicList))) {
fprintf(stderr, "%% Failed to tmq_subscribe(): %s
\n
", tmq_err2str(code));
return;
}
int32_t totalRows = 0;
int32_t msgCnt = 0;
int32_t timeout = 5000;
while (running) {
TAOS_RES
*
tmqmsg = tmq_consumer_poll(tmq, timeout);
if (tmqmsg) {
msgCnt++;
totalRows += msg_process(tmqmsg);
taos_free_result(tmqmsg);
/
*} else {*
/
/
*break;*
/
}
}
fprintf(stderr, "%d msg consumed, include %d rows
\n
", msgCnt, totalRows);
}
int main(int argc, char
*
argv[]) {
int32_t code;
if (init_env() < 0) {
return -1;
}
if (create_topic() < 0) {
return -1;
}
tmq_t
*
tmq = build_consumer();
if (NULL == tmq) {
fprintf(stderr, "%% build_consumer() fail!
\n
");
return -1;
}
tmq_list_t
*
topic_list = build_topic_list();
if (NULL == topic_list) {
return -1;
}
basic_consume_loop(tmq, topic_list);
code = tmq_unsubscribe(tmq);
if (code) {
fprintf(stderr, "%% Failed to unsubscribe: %s
\n
", tmq_err2str(code));
} else {
fprintf(stderr, "%% unsubscribe
\n
");
}
code = tmq_consumer_close(tmq);
if (code) {
fprintf(stderr, "%% Failed to close consumer: %s
\n
", tmq_err2str(code));
} else {
fprintf(stderr, "%% Consumer closed
\n
");
}
return 0;
}
</TabItem>
<TabItem
label=
"Java"
value=
"java"
>
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录