Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
612749e8
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
612749e8
编写于
7月 09, 2021
作者:
T
tickduan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
subscribe.c is modify by self restore with develop
上级
d2a6904c
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
232 addition
and
110 deletion
+232
-110
tests/examples/c/subscribe.c
tests/examples/c/subscribe.c
+232
-110
未找到文件。
tests/examples/c/subscribe.c
浏览文件 @
612749e8
...
@@ -7,135 +7,257 @@
...
@@ -7,135 +7,257 @@
#include <taos.h> // include TDengine header file
#include <taos.h> // include TDengine header file
#include <unistd.h>
#include <unistd.h>
int
nTotalRows
;
void
showme
();
void
print_result
(
TAOS_RES
*
res
,
int
blockFetch
)
{
float
calculate_delta_t
(
size_t
size
);
TAOS_ROW
row
=
NULL
;
int
is_lossless_compressed_data
(
unsigned
char
*
compressedBytes
,
size_t
cmpSize
);
int
num_fields
=
taos_num_fields
(
res
);
TAOS_FIELD
*
fields
=
taos_fetch_fields
(
res
);
#include <stdio.h>
int
nRows
=
0
;
#include <stdlib.h>
#include <string.h>
if
(
blockFetch
)
{
#include <inttypes.h>
nRows
=
taos_fetch_block
(
res
,
&
row
);
#include <taos.h> // TAOS header file
//for (int i = 0; i < nRows; i++) {
// taos_print_row(buf, row + i, fields, num_fields);
static
void
queryDB
(
TAOS
*
taos
,
char
*
command
)
{
// puts(buf);
//}
printf
(
"aaa"
);
}
else
{
/*
while
((
row
=
taos_fetch_row
(
res
)))
{
int i;
char
buf
[
4096
]
=
{
0
};
TAOS_RES *pSql = NULL;
taos_print_row
(
buf
,
row
,
fields
,
num_fields
);
int32_t code = -1;
puts
(
buf
);
nRows
++
;
for (i = 0; i < 5; i++) {
if (NULL != pSql) {
taos_free_result(pSql);
pSql = NULL;
}
pSql = taos_query(taos, command);
code = taos_errno(pSql);
if (0 == code) {
break;
}
}
}
}
if (code != 0) {
nTotalRows
+=
nRows
;
fprintf(stderr, "Failed to run %s, reason: %s\n", command, taos_errstr(pSql));
printf
(
"%d rows consumed.
\n
"
,
nRows
);
taos_free_result(pSql);
}
taos_close(taos);
exit(EXIT_FAILURE);
void
subscribe_callback
(
TAOS_SUB
*
tsub
,
TAOS_RES
*
res
,
void
*
param
,
int
code
)
{
print_result
(
res
,
*
(
int
*
)
param
);
}
void
check_row_count
(
int
line
,
TAOS_RES
*
res
,
int
expected
)
{
int
actual
=
0
;
TAOS_ROW
row
;
while
((
row
=
taos_fetch_row
(
res
)))
{
actual
++
;
}
}
if
(
actual
!=
expected
)
{
printf
(
"line %d: row count mismatch, expected: %d, actual: %d
\n
"
,
line
,
expected
,
actual
);
}
else
{
printf
(
"line %d: %d rows consumed as expected
\n
"
,
line
,
actual
);
}
}
void
do_query
(
TAOS
*
taos
,
const
char
*
sql
)
{
TAOS_RES
*
res
=
taos_query
(
taos
,
sql
);
taos_free_result
(
res
);
}
void
run_test
(
TAOS
*
taos
)
{
do_query
(
taos
,
"drop database if exists test;"
);
usleep
(
100000
);
do_query
(
taos
,
"create database test;"
);
usleep
(
100000
);
do_query
(
taos
,
"use test;"
);
usleep
(
100000
);
do_query
(
taos
,
"create table meters(ts timestamp, a int) tags(area int);"
);
do_query
(
taos
,
"create table t0 using meters tags(0);"
);
do_query
(
taos
,
"create table t1 using meters tags(1);"
);
do_query
(
taos
,
"create table t2 using meters tags(2);"
);
do_query
(
taos
,
"create table t3 using meters tags(3);"
);
do_query
(
taos
,
"create table t4 using meters tags(4);"
);
do_query
(
taos
,
"create table t5 using meters tags(5);"
);
do_query
(
taos
,
"create table t6 using meters tags(6);"
);
do_query
(
taos
,
"create table t7 using meters tags(7);"
);
do_query
(
taos
,
"create table t8 using meters tags(8);"
);
do_query
(
taos
,
"create table t9 using meters tags(9);"
);
do_query
(
taos
,
"insert into t0 values('2020-01-01 00:00:00.000', 0);"
);
do_query
(
taos
,
"insert into t0 values('2020-01-01 00:01:00.000', 0);"
);
do_query
(
taos
,
"insert into t0 values('2020-01-01 00:02:00.000', 0);"
);
do_query
(
taos
,
"insert into t1 values('2020-01-01 00:00:00.000', 0);"
);
do_query
(
taos
,
"insert into t1 values('2020-01-01 00:01:00.000', 0);"
);
do_query
(
taos
,
"insert into t1 values('2020-01-01 00:02:00.000', 0);"
);
do_query
(
taos
,
"insert into t1 values('2020-01-01 00:03:00.000', 0);"
);
do_query
(
taos
,
"insert into t2 values('2020-01-01 00:00:00.000', 0);"
);
do_query
(
taos
,
"insert into t2 values('2020-01-01 00:01:00.000', 0);"
);
do_query
(
taos
,
"insert into t2 values('2020-01-01 00:01:01.000', 0);"
);
do_query
(
taos
,
"insert into t2 values('2020-01-01 00:01:02.000', 0);"
);
do_query
(
taos
,
"insert into t3 values('2020-01-01 00:01:02.000', 0);"
);
do_query
(
taos
,
"insert into t4 values('2020-01-01 00:01:02.000', 0);"
);
do_query
(
taos
,
"insert into t5 values('2020-01-01 00:01:02.000', 0);"
);
do_query
(
taos
,
"insert into t6 values('2020-01-01 00:01:02.000', 0);"
);
do_query
(
taos
,
"insert into t7 values('2020-01-01 00:01:02.000', 0);"
);
do_query
(
taos
,
"insert into t8 values('2020-01-01 00:01:02.000', 0);"
);
do_query
(
taos
,
"insert into t9 values('2020-01-01 00:01:02.000', 0);"
);
// super tables subscription
usleep
(
1000000
);
TAOS_SUB
*
tsub
=
taos_subscribe
(
taos
,
0
,
"test"
,
"select * from meters;"
,
NULL
,
NULL
,
0
);
TAOS_RES
*
res
=
taos_consume
(
tsub
);
check_row_count
(
__LINE__
,
res
,
18
);
res
=
taos_consume
(
tsub
);
check_row_count
(
__LINE__
,
res
,
0
);
do_query
(
taos
,
"insert into t0 values('2020-01-01 00:02:00.001', 0);"
);
do_query
(
taos
,
"insert into t8 values('2020-01-01 00:01:03.000', 0);"
);
res
=
taos_consume
(
tsub
);
check_row_count
(
__LINE__
,
res
,
2
);
do_query
(
taos
,
"insert into t2 values('2020-01-01 00:01:02.001', 0);"
);
do_query
(
taos
,
"insert into t1 values('2020-01-01 00:03:00.001', 0);"
);
res
=
taos_consume
(
tsub
);
check_row_count
(
__LINE__
,
res
,
2
);
do_query
(
taos
,
"insert into t1 values('2020-01-01 00:03:00.002', 0);"
);
res
=
taos_consume
(
tsub
);
check_row_count
(
__LINE__
,
res
,
1
);
// keep progress information and restart subscription
taos_unsubscribe
(
tsub
,
1
);
do_query
(
taos
,
"insert into t0 values('2020-01-01 00:04:00.000', 0);"
);
tsub
=
taos_subscribe
(
taos
,
1
,
"test"
,
"select * from meters;"
,
NULL
,
NULL
,
0
);
res
=
taos_consume
(
tsub
);
check_row_count
(
__LINE__
,
res
,
24
);
taos_free_result(pSql);
// keep progress information and continue previous subscription
*/
taos_unsubscribe
(
tsub
,
1
);
tsub
=
taos_subscribe
(
taos
,
0
,
"test"
,
"select * from meters;"
,
NULL
,
NULL
,
0
);
res
=
taos_consume
(
tsub
);
check_row_count
(
__LINE__
,
res
,
0
);
// don't keep progress information and continue previous subscription
taos_unsubscribe
(
tsub
,
0
);
tsub
=
taos_subscribe
(
taos
,
0
,
"test"
,
"select * from meters;"
,
NULL
,
NULL
,
0
);
res
=
taos_consume
(
tsub
);
check_row_count
(
__LINE__
,
res
,
24
);
// single meter subscription
taos_unsubscribe
(
tsub
,
0
);
tsub
=
taos_subscribe
(
taos
,
0
,
"test"
,
"select * from t0;"
,
NULL
,
NULL
,
0
);
res
=
taos_consume
(
tsub
);
check_row_count
(
__LINE__
,
res
,
5
);
res
=
taos_consume
(
tsub
);
check_row_count
(
__LINE__
,
res
,
0
);
do_query
(
taos
,
"insert into t0 values('2020-01-01 00:04:00.001', 0);"
);
res
=
taos_consume
(
tsub
);
check_row_count
(
__LINE__
,
res
,
1
);
taos_unsubscribe
(
tsub
,
0
);
}
}
void
Test
(
TAOS
*
taos
,
char
*
qstr
,
int
i
);
int
main
(
int
argc
,
char
*
argv
[])
{
int
main
(
int
argc
,
char
*
argv
[])
{
//char qstr[1024];
const
char
*
host
=
"127.0.0.1"
;
const
char
*
user
=
"root"
;
is_lossless_compressed_data
(
NULL
,
0
);
const
char
*
passwd
=
"taosdata"
;
const
char
*
sql
=
"select * from meters;"
;
// connect to server
const
char
*
topic
=
"test-multiple"
;
if
(
argc
<
2
)
{
int
async
=
1
,
restart
=
0
,
keep
=
1
,
test
=
0
,
blockFetch
=
0
;
printf
(
"please input server-ip
\n
"
);
return
0
;
for
(
int
i
=
1
;
i
<
argc
;
i
++
)
{
if
(
strncmp
(
argv
[
i
],
"-h="
,
3
)
==
0
)
{
host
=
argv
[
i
]
+
3
;
continue
;
}
if
(
strncmp
(
argv
[
i
],
"-u="
,
3
)
==
0
)
{
user
=
argv
[
i
]
+
3
;
continue
;
}
if
(
strncmp
(
argv
[
i
],
"-p="
,
3
)
==
0
)
{
passwd
=
argv
[
i
]
+
3
;
continue
;
}
if
(
strcmp
(
argv
[
i
],
"-sync"
)
==
0
)
{
async
=
0
;
continue
;
}
if
(
strcmp
(
argv
[
i
],
"-restart"
)
==
0
)
{
restart
=
1
;
continue
;
}
if
(
strcmp
(
argv
[
i
],
"-single"
)
==
0
)
{
sql
=
"select * from t0;"
;
topic
=
"test-single"
;
continue
;
}
if
(
strcmp
(
argv
[
i
],
"-nokeep"
)
==
0
)
{
keep
=
0
;
continue
;
}
if
(
strncmp
(
argv
[
i
],
"-sql="
,
5
)
==
0
)
{
sql
=
argv
[
i
]
+
5
;
topic
=
"test-custom"
;
continue
;
}
if
(
strcmp
(
argv
[
i
],
"-test"
)
==
0
)
{
test
=
1
;
continue
;
}
if
(
strcmp
(
argv
[
i
],
"-block-fetch"
)
==
0
)
{
blockFetch
=
1
;
continue
;
}
}
}
TAOS
*
taos
=
taos_connect
(
argv
[
1
],
"root"
,
"taosdata"
,
NULL
,
0
);
TAOS
*
taos
=
taos_connect
(
host
,
user
,
passwd
,
""
,
0
);
if
(
taos
==
NULL
)
{
if
(
taos
==
NULL
)
{
printf
(
"failed to connect to
server, reason:%s
\n
"
,
"null taos"
/*taos_errstr(taos)*/
);
printf
(
"failed to connect to
db, reason:%s
\n
"
,
taos_errstr
(
taos
)
);
exit
(
1
);
exit
(
1
);
}
}
/*
for (int i = 0; i < 100; i++) {
if
(
test
)
{
Test(taos, qstr, i);
run_test
(
taos
);
taos_close
(
taos
);
exit
(
0
);
}
}
taos_close(taos);
taos_cleanup();
taos_select_db
(
taos
,
"test"
);
*/
TAOS_SUB
*
tsub
=
NULL
;
}
if
(
async
)
{
void
Test
(
TAOS
*
taos
,
char
*
qstr
,
int
index
)
{
// create an asynchronized subscription, the callback function will be called every 1s
tsub
=
taos_subscribe
(
taos
,
restart
,
topic
,
sql
,
subscribe_callback
,
&
blockFetch
,
1000
);
printf
(
"==================test at %d
\n
================================"
,
index
);
}
else
{
// create an synchronized subscription, need to call 'taos_consume' manually
queryDB
(
taos
,
"drop database if exists demo"
);
tsub
=
taos_subscribe
(
taos
,
restart
,
topic
,
sql
,
NULL
,
NULL
,
0
);
queryDB
(
taos
,
"create database demo"
);
}
//TAOS_RES *result;
queryDB
(
taos
,
"use demo"
);
if
(
tsub
==
NULL
)
{
printf
(
"failed to create subscription.
\n
"
);
queryDB
(
taos
,
"create table m1 (ts timestamp, ti tinyint, si smallint, i int, bi bigint, f float, d double, b binary(10))"
);
exit
(
0
);
printf
(
"success to create table
\n
"
);
}
/*
if
(
async
)
{
int i = 0;
getchar
();
for (i = 0; i < 10; ++i) {
}
else
while
(
1
)
{
sprintf(qstr, "insert into m1 values (%" PRId64 ", %d, %d, %d, %d, %f, %lf, '%s')", (uint64_t)(1546300800000 + i * 1000), i, i, i, i*10000000, i*1.0, i*2.0, "hello");
TAOS_RES
*
res
=
taos_consume
(
tsub
);
printf("qstr: %s\n", qstr);
if
(
res
==
NULL
)
{
printf
(
"failed to consume data."
);
// note: how do you wanna do if taos_query returns non-NULL
break
;
// if (taos_query(taos, qstr)) {
// printf("insert row: %i, reason:%s\n", i, taos_errstr(taos));
// }
TAOS_RES *result1 = taos_query(taos, qstr);
if (result1 == NULL || taos_errno(result1) != 0) {
printf("failed to insert row, reason:%s\n", taos_errstr(result1));
taos_free_result(result1);
exit(1);
}
else
{
}
else
{
printf("insert row: %i\n", i);
print_result
(
res
,
blockFetch
);
getchar
();
}
}
taos_free_result(result1);
}
printf("success to insert rows, total %d rows\n", i);
// query the records
sprintf(qstr, "SELECT * FROM m1");
result = taos_query(taos, qstr);
if (result == NULL || taos_errno(result) != 0) {
printf("failed to select, reason:%s\n", taos_errstr(result));
taos_free_result(result);
exit(1);
}
}
TAOS_ROW row;
printf
(
"total rows consumed: %d
\n
"
,
nTotalRows
);
int rows = 0;
taos_unsubscribe
(
tsub
,
keep
);
int num_fields = taos_field_count(result);
taos_close
(
taos
);
TAOS_FIELD *fields = taos_fetch_fields(result);
printf("num_fields = %d\n", num_fields);
printf("select * from table, result:\n");
// fetch the records row by row
while ((row = taos_fetch_row(result))) {
char temp[1024] = {0};
rows++;
taos_print_row(temp, row, fields, num_fields);
printf("%s\n", temp);
}
taos_free_result(result);
return
0
;
printf("====demo end====\n\n");
*/
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录