Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
ed137b36
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
ed137b36
编写于
8月 30, 2022
作者:
wmmhello
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
opti: test casese for tmq snapshot for taosX
上级
1238365c
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
226 addition
and
1233 deletion
+226
-1233
examples/c/CMakeLists.txt
examples/c/CMakeLists.txt
+0
-15
examples/c/tmq_taosx.c
examples/c/tmq_taosx.c
+0
-489
tests/system-test/7-tmq/tmq_taosx.py
tests/system-test/7-tmq/tmq_taosx.py
+76
-89
tests/test/c/CMakeLists.txt
tests/test/c/CMakeLists.txt
+0
-8
tests/test/c/tmq_taosx_ci.c
tests/test/c/tmq_taosx_ci.c
+150
-120
tests/test/c/tmq_taosx_snapshot_ci.c
tests/test/c/tmq_taosx_snapshot_ci.c
+0
-512
未找到文件。
examples/c/CMakeLists.txt
浏览文件 @
ed137b36
...
...
@@ -13,15 +13,9 @@ IF (TD_LINUX)
#TARGET_LINK_LIBRARIES(epoll taos_static trpc tutil pthread lua)
add_executable
(
tmq
""
)
add_executable
(
tmq_taosx
""
)
add_executable
(
stream_demo
""
)
add_executable
(
demoapi
""
)
target_sources
(
tmq_taosx
PRIVATE
"tmq_taosx.c"
)
target_sources
(
tmq
PRIVATE
"tmq.c"
...
...
@@ -41,10 +35,6 @@ IF (TD_LINUX)
taos_static
)
target_link_libraries
(
tmq_taosx
taos_static
)
target_link_libraries
(
stream_demo
taos_static
)
...
...
@@ -57,10 +47,6 @@ IF (TD_LINUX)
PUBLIC
"
${
TD_SOURCE_DIR
}
/include/os"
PRIVATE
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/inc"
)
target_include_directories
(
tmq_taosx
PUBLIC
"
${
TD_SOURCE_DIR
}
/include/os"
PRIVATE
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/inc"
)
target_include_directories
(
stream_demo
PRIVATE
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/inc"
...
...
@@ -73,7 +59,6 @@ IF (TD_LINUX)
)
SET_TARGET_PROPERTIES
(
tmq PROPERTIES OUTPUT_NAME tmq
)
SET_TARGET_PROPERTIES
(
tmq_taosx PROPERTIES OUTPUT_NAME tmq_taosx
)
SET_TARGET_PROPERTIES
(
stream_demo PROPERTIES OUTPUT_NAME stream_demo
)
SET_TARGET_PROPERTIES
(
demoapi PROPERTIES OUTPUT_NAME demoapi
)
ENDIF
()
...
...
examples/c/tmq_taosx.c
已删除
100644 → 0
浏览文件 @
1238365c
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include "taos.h"
static
int
running
=
1
;
static
TAOS
*
use_db
(){
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
if
(
pConn
==
NULL
)
{
return
NULL
;
}
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"use db_taosx"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in use db_taosx, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
NULL
;
}
taos_free_result
(
pRes
);
return
pConn
;
}
static
void
msg_process
(
TAOS_RES
*
msg
)
{
/*memset(buf, 0, 1024);*/
printf
(
"-----------topic-------------: %s
\n
"
,
tmq_get_topic_name
(
msg
));
printf
(
"db: %s
\n
"
,
tmq_get_db_name
(
msg
));
printf
(
"vg: %d
\n
"
,
tmq_get_vgroup_id
(
msg
));
TAOS
*
pConn
=
use_db
();
if
(
tmq_get_res_type
(
msg
)
==
TMQ_RES_TABLE_META
)
{
char
*
result
=
tmq_get_json_meta
(
msg
);
if
(
result
)
{
printf
(
"meta result: %s
\n
"
,
result
);
}
tmq_free_json_meta
(
result
);
}
tmq_raw_data
raw
=
{
0
};
tmq_get_raw
(
msg
,
&
raw
);
int32_t
ret
=
tmq_write_raw
(
pConn
,
raw
);
printf
(
"write raw data: %s
\n
"
,
tmq_err2str
(
ret
));
// else{
// while(1){
// int numOfRows = 0;
// void *pData = NULL;
// taos_fetch_raw_block(msg, &numOfRows, &pData);
// if(numOfRows == 0) break;
// printf("write data: tbname:%s, numOfRows:%d\n", tmq_get_table_name(msg), numOfRows);
// int ret = taos_write_raw_block(pConn, numOfRows, pData, tmq_get_table_name(msg));
// printf("write raw data: %s\n", tmq_err2str(ret));
// }
// }
taos_close
(
pConn
);
}
int32_t
init_env
()
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
if
(
pConn
==
NULL
)
{
return
-
1
;
}
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"drop database if exists db_taosx"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in drop db_taosx, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create database if not exists db_taosx vgroups 4"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in create db_taosx, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"drop database if exists abc1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in drop db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create database if not exists abc1 vgroups 3"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in create db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"use abc1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in use db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
"nchar(8), t4 bool)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create super table st1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table if not exists ct0 using st1 tags(1000,
\"
ttt
\"
, true)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create child table tu1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into ct0 values(1626006833600, 1, 2, 'a')"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into ct0, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table if not exists ct1 using st1(t1) tags(2000)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create child table ct1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table if not exists ct2 using st1(t1) tags(NULL)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create child table ct2, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into ct1 values(1626006833600, 3, 4, 'b')"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into ct1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table if not exists ct3 using st1(t1) tags(3000)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create child table ct3, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table if not exists ct4 using st1(t3) tags('ct4')"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create child table ct4, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into ct3 values(1626006833600, 5, 6, 'c') ct1 values(1626006833601, 2, 3, 'sds') (1626006833602, 4, 5, 'ddd') ct0 values(1626006833602, 4, 3, 'hwj') ct1 values(now+5s, 23, 32, 's21ds')"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into ct3, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"alter table st1 add column c4 bigint"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to alter super table st1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"alter table st1 modify column c3 binary(64)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to alter super table st1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into ct3 values(1626006833605, 53, 63, 'cffffffffffffffffffffffffffff', 8989898899999) (1626006833609, 51, 62, 'c333', 940)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into ct3, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into ct3 select * from ct1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into ct3, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"alter table st1 add tag t2 binary(64)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to alter super table st1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"alter table ct3 set tag t1=5000"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to slter child table ct3, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"delete from abc1 .ct3 where ts < 1626006833606"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into ct3, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"drop table ct3 ct1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to drop child table ct3, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"drop table st1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to drop super table st1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table if not exists n1(ts timestamp, c1 int, c2 nchar(4))"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create normal table n1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"alter table n1 add column c3 bigint"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to alter normal table n1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"alter table n1 modify column c2 nchar(8)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to alter normal table n1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"alter table n1 rename column c3 cc3"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to alter normal table n1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"alter table n1 comment 'hello'"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to alter normal table n1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"alter table n1 drop column c1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to alter normal table n1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into n1 values(now, 'eeee', 8989898899999) (now+9s, 'c333', 940)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into n1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"drop table n1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to drop normal table n1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table jt(ts timestamp, i int) tags(t json)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create super table jt, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table jt1 using jt tags('{
\"
k1
\"
:1,
\"
k2
\"
:
\"
hello
\"
}')"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create super table jt, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table jt2 using jt tags('')"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create super table jt2, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
"nchar(8), t4 bool)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create super table st1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"drop table st1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to drop super table st1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
taos_close
(
pConn
);
return
0
;
}
int32_t
create_topic
()
{
printf
(
"create topic
\n
"
);
TAOS_RES
*
pRes
;
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
if
(
pConn
==
NULL
)
{
return
-
1
;
}
pRes
=
taos_query
(
pConn
,
"use abc1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in use db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create topic topic_ctb_column with meta as database abc1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create topic topic_ctb_column, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
taos_close
(
pConn
);
return
0
;
}
void
tmq_commit_cb_print
(
tmq_t
*
tmq
,
int32_t
code
,
void
*
param
)
{
printf
(
"commit %d tmq %p param %p
\n
"
,
code
,
tmq
,
param
);
}
tmq_t
*
build_consumer
()
{
#if 0
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
#endif
tmq_conf_t
*
conf
=
tmq_conf_new
();
tmq_conf_set
(
conf
,
"group.id"
,
"tg2"
);
tmq_conf_set
(
conf
,
"client.id"
,
"my app 1"
);
tmq_conf_set
(
conf
,
"td.connect.user"
,
"root"
);
tmq_conf_set
(
conf
,
"td.connect.pass"
,
"taosdata"
);
tmq_conf_set
(
conf
,
"msg.with.table.name"
,
"true"
);
tmq_conf_set
(
conf
,
"enable.auto.commit"
,
"true"
);
tmq_conf_set
(
conf
,
"experimental.snapshot.enable"
,
"true"
);
/*tmq_conf_set(conf, "experimental.snapshot.enable", "true");*/
tmq_conf_set_auto_commit_cb
(
conf
,
tmq_commit_cb_print
,
NULL
);
tmq_t
*
tmq
=
tmq_consumer_new
(
conf
,
NULL
,
0
);
assert
(
tmq
);
tmq_conf_destroy
(
conf
);
return
tmq
;
}
tmq_list_t
*
build_topic_list
()
{
tmq_list_t
*
topic_list
=
tmq_list_new
();
tmq_list_append
(
topic_list
,
"topic_ctb_column"
);
/*tmq_list_append(topic_list, "tmq_test_db_multi_insert_topic");*/
return
topic_list
;
}
void
basic_consume_loop
(
tmq_t
*
tmq
,
tmq_list_t
*
topics
)
{
int32_t
code
;
if
((
code
=
tmq_subscribe
(
tmq
,
topics
)))
{
fprintf
(
stderr
,
"%% Failed to start consuming topics: %s
\n
"
,
tmq_err2str
(
code
));
printf
(
"subscribe err
\n
"
);
return
;
}
int32_t
cnt
=
0
;
while
(
running
)
{
TAOS_RES
*
tmqmessage
=
tmq_consumer_poll
(
tmq
,
1000
);
if
(
tmqmessage
)
{
cnt
++
;
msg_process
(
tmqmessage
);
/*if (cnt >= 2) break;*/
/*printf("get data\n");*/
taos_free_result
(
tmqmessage
);
/*} else {*/
/*break;*/
/*tmq_commit_sync(tmq, NULL);*/
}
}
code
=
tmq_consumer_close
(
tmq
);
if
(
code
)
fprintf
(
stderr
,
"%% Failed to close consumer: %s
\n
"
,
tmq_err2str
(
code
));
else
fprintf
(
stderr
,
"%% Consumer closed
\n
"
);
}
void
sync_consume_loop
(
tmq_t
*
tmq
,
tmq_list_t
*
topics
)
{
static
const
int
MIN_COMMIT_COUNT
=
1
;
int
msg_count
=
0
;
int32_t
code
;
if
((
code
=
tmq_subscribe
(
tmq
,
topics
)))
{
fprintf
(
stderr
,
"%% Failed to start consuming topics: %s
\n
"
,
tmq_err2str
(
code
));
return
;
}
tmq_list_t
*
subList
=
NULL
;
tmq_subscription
(
tmq
,
&
subList
);
char
**
subTopics
=
tmq_list_to_c_array
(
subList
);
int32_t
sz
=
tmq_list_get_size
(
subList
);
printf
(
"subscribed topics: "
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
printf
(
"%s, "
,
subTopics
[
i
]);
}
printf
(
"
\n
"
);
tmq_list_destroy
(
subList
);
while
(
running
)
{
TAOS_RES
*
tmqmessage
=
tmq_consumer_poll
(
tmq
,
1000
);
if
(
tmqmessage
)
{
msg_process
(
tmqmessage
);
taos_free_result
(
tmqmessage
);
/*tmq_commit_sync(tmq, NULL);*/
/*if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);*/
}
}
code
=
tmq_consumer_close
(
tmq
);
if
(
code
)
fprintf
(
stderr
,
"%% Failed to close consumer: %s
\n
"
,
tmq_err2str
(
code
));
else
fprintf
(
stderr
,
"%% Consumer closed
\n
"
);
}
int
main
(
int
argc
,
char
*
argv
[])
{
printf
(
"env init
\n
"
);
if
(
init_env
()
<
0
)
{
return
-
1
;
}
create_topic
();
tmq_t
*
tmq
=
build_consumer
();
tmq_list_t
*
topic_list
=
build_topic_list
();
basic_consume_loop
(
tmq
,
topic_list
);
/*sync_consume_loop(tmq, topic_list);*/
}
tests/system-test/7-tmq/tmq_taosx.py
浏览文件 @
ed137b36
...
...
@@ -20,15 +20,9 @@ class TDTestCase:
tdSql
.
init
(
conn
.
cursor
())
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
def
checkFileContent
(
self
):
buildPath
=
tdCom
.
getBuildPath
()
cfgPath
=
tdCom
.
getClientCfgPath
()
cmdStr
=
'%s/build/bin/tmq_taosx_ci -c %s'
%
(
buildPath
,
cfgPath
)
tdLog
.
info
(
cmdStr
)
os
.
system
(
cmdStr
)
srcFile
=
'%s/../log/tmq_taosx_tmp.source'
%
(
cfgPath
)
dstFile
=
'%s/../log/tmq_taosx_tmp.result'
%
(
cfgPath
)
def
checkJson
(
self
,
cfgPath
,
name
):
srcFile
=
'%s/../log/%s.source'
%
(
cfgPath
,
name
)
dstFile
=
'%s/../log/%s.result'
%
(
cfgPath
,
name
)
tdLog
.
info
(
"compare file: %s, %s"
%
(
srcFile
,
dstFile
))
consumeFile
=
open
(
srcFile
,
mode
=
'r'
)
...
...
@@ -43,7 +37,31 @@ class TDTestCase:
tdLog
.
exit
(
"compare error: %s != %s"
%
src
,
dst
)
else
:
break
return
def
checkDropData
(
self
):
tdSql
.
execute
(
'use db_taosx'
)
tdSql
.
query
(
"show tables"
)
tdSql
.
checkRows
(
2
)
tdSql
.
query
(
"select * from jt order by i"
)
tdSql
.
checkRows
(
2
)
tdSql
.
checkData
(
0
,
1
,
1
)
tdSql
.
checkData
(
1
,
1
,
11
)
tdSql
.
checkData
(
0
,
2
,
'{"k1":1,"k2":"hello"}'
)
tdSql
.
checkData
(
1
,
2
,
None
)
tdSql
.
execute
(
'use abc1'
)
tdSql
.
query
(
"show tables"
)
tdSql
.
checkRows
(
2
)
tdSql
.
query
(
"select * from jt order by i"
)
tdSql
.
checkRows
(
2
)
tdSql
.
checkData
(
0
,
1
,
1
)
tdSql
.
checkData
(
1
,
1
,
11
)
tdSql
.
checkData
(
0
,
2
,
'{"k1":1,"k2":"hello"}'
)
tdSql
.
checkData
(
1
,
2
,
None
)
return
def
checkData
(
self
):
tdSql
.
execute
(
'use db_taosx'
)
tdSql
.
query
(
"select * from ct3 order by c1 desc"
)
tdSql
.
checkRows
(
2
)
...
...
@@ -116,113 +134,82 @@ class TDTestCase:
tdSql
.
checkData
(
0
,
2
,
None
)
tdSql
.
checkData
(
1
,
1
,
1
)
tdSql
.
checkData
(
1
,
2
,
'{"k1":1,"k2":"hello"}'
)
tdSql
.
execute
(
'drop topic if exists topic_ctb_column'
)
return
def
check
FileContentSnapshot
(
self
):
def
check
Wal1Vgroup
(
self
):
buildPath
=
tdCom
.
getBuildPath
()
cfgPath
=
tdCom
.
getClientCfgPath
()
cmdStr
=
'%s/build/bin/tmq_taosx_
snapshot_ci -c %s
'
%
(
buildPath
,
cfgPath
)
cmdStr
=
'%s/build/bin/tmq_taosx_
ci -c %s -sv 1 -dv 1
'
%
(
buildPath
,
cfgPath
)
tdLog
.
info
(
cmdStr
)
os
.
system
(
cmdStr
)
srcFile
=
'%s/../log/tmq_taosx_tmp_snapshot.source'
%
(
cfgPath
)
dstFile
=
'%s/../log/tmq_taosx_tmp_snapshot.result'
%
(
cfgPath
)
tdLog
.
info
(
"compare file: %s, %s"
%
(
srcFile
,
dstFile
))
consumeFile
=
open
(
srcFile
,
mode
=
'r'
)
queryFile
=
open
(
dstFile
,
mode
=
'r'
)
while
True
:
dst
=
queryFile
.
readline
()
src
=
consumeFile
.
readline
()
self
.
checkJson
(
cfgPath
,
"tmq_taosx_tmp"
)
self
.
checkData
()
if
dst
:
if
dst
!=
src
:
tdLog
.
exit
(
"compare error: %s != %s"
%
src
,
dst
)
else
:
break
return
tdSql
.
execute
(
'use db_taosx'
)
tdSql
.
query
(
"select * from ct3 order by c1 desc"
)
tdSql
.
checkRows
(
2
)
tdSql
.
checkData
(
0
,
1
,
51
)
tdSql
.
checkData
(
0
,
4
,
940
)
tdSql
.
checkData
(
1
,
1
,
23
)
tdSql
.
checkData
(
1
,
4
,
None
)
def
checkWalMultiVgroups
(
self
):
buildPath
=
tdCom
.
getBuildPath
()
cmdStr
=
'%s/build/bin/tmq_taosx_ci -sv 3 -dv 5'
%
(
buildPath
)
tdLog
.
info
(
cmdStr
)
os
.
system
(
cmdStr
)
tdSql
.
query
(
"select * from st1 order by ts"
)
tdSql
.
checkRows
(
8
)
tdSql
.
checkData
(
0
,
1
,
1
)
tdSql
.
checkData
(
1
,
1
,
3
)
tdSql
.
checkData
(
4
,
1
,
4
)
tdSql
.
checkData
(
6
,
1
,
23
)
self
.
checkData
()
tdSql
.
checkData
(
0
,
2
,
2
)
tdSql
.
checkData
(
1
,
2
,
4
)
tdSql
.
checkData
(
4
,
2
,
3
)
tdSql
.
checkData
(
6
,
2
,
32
)
return
tdSql
.
checkData
(
0
,
3
,
'a'
)
tdSql
.
checkData
(
1
,
3
,
'b'
)
tdSql
.
checkData
(
4
,
3
,
'hwj'
)
tdSql
.
checkData
(
6
,
3
,
's21ds'
)
def
checkWalMultiVgroupsWithDropTable
(
self
):
buildPath
=
tdCom
.
getBuildPath
()
cmdStr
=
'%s/build/bin/tmq_taosx_ci -sv 3 -dv 5 -d'
%
(
buildPath
)
tdLog
.
info
(
cmdStr
)
os
.
system
(
cmdStr
)
tdSql
.
checkData
(
0
,
4
,
None
)
tdSql
.
checkData
(
1
,
4
,
None
)
tdSql
.
checkData
(
5
,
4
,
940
)
tdSql
.
checkData
(
6
,
4
,
None
)
self
.
checkDropData
()
tdSql
.
checkData
(
0
,
5
,
1000
)
tdSql
.
checkData
(
1
,
5
,
2000
)
tdSql
.
checkData
(
4
,
5
,
1000
)
tdSql
.
checkData
(
6
,
5
,
5000
)
return
tdSql
.
checkData
(
0
,
6
,
'ttt'
)
tdSql
.
checkData
(
1
,
6
,
None
)
tdSql
.
checkData
(
4
,
6
,
'ttt'
)
tdSql
.
checkData
(
6
,
6
,
None
)
def
checkSnapshot1Vgroup
(
self
):
buildPath
=
tdCom
.
getBuildPath
()
cfgPath
=
tdCom
.
getClientCfgPath
()
cmdStr
=
'%s/build/bin/tmq_taosx_ci -c %s -sv 1 -dv 1 -s'
%
(
buildPath
,
cfgPath
)
tdLog
.
info
(
cmdStr
)
os
.
system
(
cmdStr
)
tdSql
.
checkData
(
0
,
7
,
True
)
tdSql
.
checkData
(
1
,
7
,
None
)
tdSql
.
checkData
(
4
,
7
,
True
)
tdSql
.
checkData
(
6
,
7
,
None
)
self
.
checkJson
(
cfgPath
,
"tmq_taosx_tmp_snapshot"
)
self
.
checkData
()
tdSql
.
checkData
(
0
,
8
,
None
)
tdSql
.
checkData
(
1
,
8
,
None
)
tdSql
.
checkData
(
4
,
8
,
None
)
tdSql
.
checkData
(
6
,
8
,
None
)
return
tdSql
.
query
(
"select * from ct1"
)
tdSql
.
checkRows
(
4
)
def
checkSnapshotMultiVgroups
(
self
):
buildPath
=
tdCom
.
getBuildPath
()
cmdStr
=
'%s/build/bin/tmq_taosx_ci -sv 2 -dv 4 -s'
%
(
buildPath
)
tdLog
.
info
(
cmdStr
)
os
.
system
(
cmdStr
)
tdSql
.
query
(
"select * from ct2"
)
tdSql
.
checkRows
(
0
)
self
.
checkData
()
tdSql
.
query
(
"select * from ct0 order by c1"
)
tdSql
.
checkRows
(
2
)
tdSql
.
checkData
(
0
,
3
,
"a"
)
tdSql
.
checkData
(
1
,
4
,
None
)
return
tdSql
.
query
(
"select * from n1 order by cc3 desc"
)
tdSql
.
checkRows
(
2
)
tdSql
.
checkData
(
0
,
1
,
"eeee"
)
tdSql
.
checkData
(
1
,
2
,
940
)
def
checkSnapshotMultiVgroupsWithDropTable
(
self
):
buildPath
=
tdCom
.
getBuildPath
()
cmdStr
=
'%s/build/bin/tmq_taosx_ci -sv 2 -dv 4 -s -d'
%
(
buildPath
)
tdLog
.
info
(
cmdStr
)
os
.
system
(
cmdStr
)
tdSql
.
query
(
"select * from jt order by i desc"
)
tdSql
.
checkRows
(
2
)
tdSql
.
checkData
(
0
,
1
,
11
)
tdSql
.
checkData
(
0
,
2
,
None
)
tdSql
.
checkData
(
1
,
1
,
1
)
tdSql
.
checkData
(
1
,
2
,
'{"k1":1,"k2":"hello"}'
)
self
.
checkDropData
()
return
def
run
(
self
):
tdSql
.
prepare
()
self
.
checkFileContent
()
self
.
checkFileContentSnapshot
()
self
.
checkWal1Vgroup
()
self
.
checkSnapshot1Vgroup
()
self
.
checkWalMultiVgroups
()
self
.
checkSnapshotMultiVgroups
()
self
.
checkWalMultiVgroupsWithDropTable
()
self
.
checkSnapshotMultiVgroupsWithDropTable
()
def
stop
(
self
):
tdSql
.
close
()
...
...
tests/test/c/CMakeLists.txt
浏览文件 @
ed137b36
...
...
@@ -2,7 +2,6 @@ add_executable(tmq_demo tmqDemo.c)
add_executable
(
tmq_sim tmqSim.c
)
add_executable
(
create_table createTable.c
)
add_executable
(
tmq_taosx_ci tmq_taosx_ci.c
)
add_executable
(
tmq_taosx_snapshot_ci tmq_taosx_snapshot_ci.c
)
add_executable
(
sml_test sml_test.c
)
target_link_libraries
(
create_table
...
...
@@ -32,13 +31,6 @@ target_link_libraries(
PUBLIC common
PUBLIC os
)
target_link_libraries
(
tmq_taosx_snapshot_ci
PUBLIC taos_static
PUBLIC util
PUBLIC common
PUBLIC os
)
target_link_libraries
(
sml_test
...
...
tests/test/c/tmq_taosx_ci.c
浏览文件 @
ed137b36
...
...
@@ -22,8 +22,16 @@
#include "types.h"
static
int
running
=
1
;
TdFilePtr
g_fp
=
NULL
;
char
dir
[
64
]
=
{
0
};
TdFilePtr
g_fp
=
NULL
;
typedef
struct
{
bool
snapShot
;
bool
dropTable
;
int
srcVgroups
;
int
dstVgroups
;
char
dir
[
64
];
}
Config
;
Config
g_conf
=
{
0
};
static
TAOS
*
use_db
(){
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
...
...
@@ -41,7 +49,6 @@ static TAOS* use_db(){
}
static
void
msg_process
(
TAOS_RES
*
msg
)
{
/*memset(buf, 0, 1024);*/
printf
(
"-----------topic-------------: %s
\n
"
,
tmq_get_topic_name
(
msg
));
printf
(
"db: %s
\n
"
,
tmq_get_db_name
(
msg
));
printf
(
"vg: %d
\n
"
,
tmq_get_vgroup_id
(
msg
));
...
...
@@ -51,8 +58,11 @@ static void msg_process(TAOS_RES* msg) {
if
(
result
)
{
printf
(
"meta result: %s
\n
"
,
result
);
}
taosFprintfFile
(
g_fp
,
result
);
taosFprintfFile
(
g_fp
,
"
\n
"
);
if
(
g_fp
){
taosFprintfFile
(
g_fp
,
result
);
taosFprintfFile
(
g_fp
,
"
\n
"
);
}
tmq_free_json_meta
(
result
);
}
...
...
@@ -61,22 +71,10 @@ static void msg_process(TAOS_RES* msg) {
int32_t
ret
=
tmq_write_raw
(
pConn
,
raw
);
printf
(
"write raw data: %s
\n
"
,
tmq_err2str
(
ret
));
// else{
// while(1){
// int numOfRows = 0;
// void *pData = NULL;
// taos_fetch_raw_block(msg, &numOfRows, &pData);
// if(numOfRows == 0) break;
// printf("write data: tbname:%s, numOfRows:%d\n", tmq_get_table_name(msg), numOfRows);
// int ret = taos_write_raw_block(pConn, numOfRows, pData, tmq_get_table_name(msg));
// printf("write raw data: %s\n", tmq_err2str(ret));
// }
// }
taos_close
(
pConn
);
}
int32_t
init_env
()
{
int32_t
init_env
(
Config
*
conf
)
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
if
(
pConn
==
NULL
)
{
return
-
1
;
...
...
@@ -89,13 +87,22 @@ int32_t init_env() {
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create database if not exists db_taosx vgroups 1"
);
char
sql
[
128
]
=
{
0
};
snprintf
(
sql
,
128
,
"create database if not exists db_taosx vgroups %d"
,
conf
->
dstVgroups
);
pRes
=
taos_query
(
pConn
,
sql
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in create db_taosx, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"drop topic if exists topic_db"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in drop topic, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"drop database if exists abc1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in drop db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
...
...
@@ -103,7 +110,8 @@ int32_t init_env() {
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create database if not exists abc1 vgroups 1"
);
snprintf
(
sql
,
128
,
"create database if not exists abc1 vgroups %d"
,
conf
->
srcVgroups
);
pRes
=
taos_query
(
pConn
,
sql
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in create db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
...
...
@@ -133,7 +141,7 @@ int32_t init_env() {
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into ct0 values(1626006833
6
00, 1, 2, 'a')"
);
pRes
=
taos_query
(
pConn
,
"insert into ct0 values(1626006833
4
00, 1, 2, 'a')"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into ct0, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
...
...
@@ -168,7 +176,7 @@ int32_t init_env() {
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into ct3 values(1626006833600, 5, 6, 'c') ct1 values(1626006833601, 2, 3, 'sds') (1626006833602, 4, 5, 'ddd') ct0 values(162600683360
2
, 4, 3, 'hwj') ct1 values(now+5s, 23, 32, 's21ds')"
);
pRes
=
taos_query
(
pConn
,
"insert into ct3 values(1626006833600, 5, 6, 'c') ct1 values(1626006833601, 2, 3, 'sds') (1626006833602, 4, 5, 'ddd') ct0 values(162600683360
3
, 4, 3, 'hwj') ct1 values(now+5s, 23, 32, 's21ds')"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into ct3, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
...
...
@@ -224,6 +232,22 @@ int32_t init_env() {
}
taos_free_result
(
pRes
);
if
(
conf
->
dropTable
){
pRes
=
taos_query
(
pConn
,
"drop table ct3 ct1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to drop child table ct3, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"drop table st1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to drop super table st1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
}
pRes
=
taos_query
(
pConn
,
"create table if not exists n1(ts timestamp, c1 int, c2 nchar(4))"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create normal table n1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
...
...
@@ -273,6 +297,15 @@ int32_t init_env() {
}
taos_free_result
(
pRes
);
if
(
conf
->
dropTable
){
pRes
=
taos_query
(
pConn
,
"drop table n1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to drop normal table n1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
}
pRes
=
taos_query
(
pConn
,
"create table jt(ts timestamp, i int) tags(t json)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create super table jt, reason:%s
\n
"
,
taos_errstr
(
pRes
));
...
...
@@ -308,6 +341,23 @@ int32_t init_env() {
}
taos_free_result
(
pRes
);
if
(
conf
->
dropTable
){
pRes
=
taos_query
(
pConn
,
"create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
"nchar(8), t4 bool)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create super table st1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"drop table st1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to drop super table st1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
}
taos_close
(
pConn
);
return
0
;
}
...
...
@@ -327,9 +377,9 @@ int32_t create_topic() {
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create topic topic_
ctb_column
with meta as database abc1"
);
pRes
=
taos_query
(
pConn
,
"create topic topic_
db
with meta as database abc1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create topic topic_
ctb_column
, reason:%s
\n
"
,
taos_errstr
(
pRes
));
printf
(
"failed to create topic topic_
db
, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
...
...
@@ -342,18 +392,7 @@ void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
printf
(
"commit %d tmq %p param %p
\n
"
,
code
,
tmq
,
param
);
}
tmq_t
*
build_consumer
()
{
#if 0
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
#endif
tmq_t
*
build_consumer
(
Config
*
config
)
{
tmq_conf_t
*
conf
=
tmq_conf_new
();
tmq_conf_set
(
conf
,
"group.id"
,
"tg2"
);
tmq_conf_set
(
conf
,
"client.id"
,
"my app 1"
);
...
...
@@ -363,7 +402,9 @@ tmq_t* build_consumer() {
tmq_conf_set
(
conf
,
"enable.auto.commit"
,
"true"
);
tmq_conf_set
(
conf
,
"enable.heartbeat.background"
,
"true"
);
/*tmq_conf_set(conf, "experimental.snapshot.enable", "true");*/
if
(
config
->
snapShot
){
tmq_conf_set
(
conf
,
"experimental.snapshot.enable"
,
"true"
);
}
tmq_conf_set_auto_commit_cb
(
conf
,
tmq_commit_cb_print
,
NULL
);
tmq_t
*
tmq
=
tmq_consumer_new
(
conf
,
NULL
,
0
);
...
...
@@ -374,8 +415,7 @@ tmq_t* build_consumer() {
tmq_list_t
*
build_topic_list
()
{
tmq_list_t
*
topic_list
=
tmq_list_new
();
tmq_list_append
(
topic_list
,
"topic_ctb_column"
);
/*tmq_list_append(topic_list, "tmq_test_db_multi_insert_topic");*/
tmq_list_append
(
topic_list
,
"topic_db"
);
return
topic_list
;
}
...
...
@@ -393,12 +433,7 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
if
(
tmqmessage
)
{
cnt
++
;
msg_process
(
tmqmessage
);
/*if (cnt >= 2) break;*/
/*printf("get data\n");*/
taos_free_result
(
tmqmessage
);
/*} else {*/
/*break;*/
/*tmq_commit_sync(tmq, NULL);*/
}
else
{
break
;
}
...
...
@@ -411,52 +446,18 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
fprintf
(
stderr
,
"%% Consumer closed
\n
"
);
}
void
sync_consume_loop
(
tmq_t
*
tmq
,
tmq_list_t
*
topics
)
{
static
const
int
MIN_COMMIT_COUNT
=
1
;
int
msg_count
=
0
;
int32_t
code
;
if
((
code
=
tmq_subscribe
(
tmq
,
topics
)))
{
fprintf
(
stderr
,
"%% Failed to start consuming topics: %s
\n
"
,
tmq_err2str
(
code
));
return
;
}
tmq_list_t
*
subList
=
NULL
;
tmq_subscription
(
tmq
,
&
subList
);
char
**
subTopics
=
tmq_list_to_c_array
(
subList
);
int32_t
sz
=
tmq_list_get_size
(
subList
);
printf
(
"subscribed topics: "
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
printf
(
"%s, "
,
subTopics
[
i
]);
}
printf
(
"
\n
"
);
tmq_list_destroy
(
subList
);
while
(
running
)
{
TAOS_RES
*
tmqmessage
=
tmq_consumer_poll
(
tmq
,
1000
);
if
(
tmqmessage
)
{
msg_process
(
tmqmessage
);
taos_free_result
(
tmqmessage
);
/*tmq_commit_sync(tmq, NULL);*/
/*if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);*/
}
}
code
=
tmq_consumer_close
(
tmq
);
if
(
code
)
fprintf
(
stderr
,
"%% Failed to close consumer: %s
\n
"
,
tmq_err2str
(
code
));
else
fprintf
(
stderr
,
"%% Consumer closed
\n
"
);
}
void
initLogFile
()
{
void
initLogFile
(
Config
*
conf
)
{
char
f1
[
256
]
=
{
0
};
char
f2
[
256
]
=
{
0
};
sprintf
(
f1
,
"%s/../log/tmq_taosx_tmp.source"
,
dir
);
sprintf
(
f2
,
"%s/../log/tmq_taosx_tmp.result"
,
dir
);
if
(
conf
->
snapShot
){
sprintf
(
f1
,
"%s/../log/tmq_taosx_tmp_snapshot.source"
,
conf
->
dir
);
sprintf
(
f2
,
"%s/../log/tmq_taosx_tmp_snapshot.result"
,
conf
->
dir
);
}
else
{
sprintf
(
f1
,
"%s/../log/tmq_taosx_tmp.source"
,
conf
->
dir
);
sprintf
(
f2
,
"%s/../log/tmq_taosx_tmp.result"
,
conf
->
dir
);
}
TdFilePtr
pFile
=
taosOpenFile
(
f1
,
TD_FILE_TEXT
|
TD_FILE_TRUNC
|
TD_FILE_STREAM
);
if
(
NULL
==
pFile
)
{
fprintf
(
stderr
,
"Failed to open %s for save result
\n
"
,
f1
);
...
...
@@ -469,53 +470,82 @@ void initLogFile() {
fprintf
(
stderr
,
"Failed to open %s for save result
\n
"
,
f2
);
exit
(
-
1
);
}
char
*
result
[]
=
{
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
st1
\"
,
\"
tableType
\"
:
\"
super
\"
,
\"
columns
\"
:[{
\"
name
\"
:
\"
ts
\"
,
\"
type
\"
:9},{
\"
name
\"
:
\"
c1
\"
,
\"
type
\"
:4},{
\"
name
\"
:
\"
c2
\"
,
\"
type
\"
:6},{
\"
name
\"
:
\"
c3
\"
,
\"
type
\"
:8,
\"
length
\"
:16}],
\"
tags
\"
:[{
\"
name
\"
:
\"
t1
\"
,
\"
type
\"
:4},{
\"
name
\"
:
\"
t3
\"
,
\"
type
\"
:10,
\"
length
\"
:8},{
\"
name
\"
:
\"
t4
\"
,
\"
type
\"
:1}]}"
,
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
ct0
\"
,
\"
tableType
\"
:
\"
child
\"
,
\"
using
\"
:
\"
st1
\"
,
\"
tagNum
\"
:3,
\"
tags
\"
:[{
\"
name
\"
:
\"
t1
\"
,
\"
type
\"
:4,
\"
value
\"
:1000},{
\"
name
\"
:
\"
t3
\"
,
\"
type
\"
:10,
\"
value
\"
:
\"\\\"
ttt
\\\"\"
},{
\"
name
\"
:
\"
t4
\"
,
\"
type
\"
:1,
\"
value
\"
:1}]}"
,
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
ct1
\"
,
\"
tableType
\"
:
\"
child
\"
,
\"
using
\"
:
\"
st1
\"
,
\"
tagNum
\"
:3,
\"
tags
\"
:[{
\"
name
\"
:
\"
t1
\"
,
\"
type
\"
:4,
\"
value
\"
:2000}]}"
,
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
ct2
\"
,
\"
tableType
\"
:
\"
child
\"
,
\"
using
\"
:
\"
st1
\"
,
\"
tagNum
\"
:3,
\"
tags
\"
:[]}"
,
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
ct3
\"
,
\"
tableType
\"
:
\"
child
\"
,
\"
using
\"
:
\"
st1
\"
,
\"
tagNum
\"
:3,
\"
tags
\"
:[{
\"
name
\"
:
\"
t1
\"
,
\"
type
\"
:4,
\"
value
\"
:3000}]}"
,
"{
\"
type
\"
:
\"
alter
\"
,
\"
tableName
\"
:
\"
st1
\"
,
\"
tableType
\"
:
\"
super
\"
,
\"
alterType
\"
:5,
\"
colName
\"
:
\"
c4
\"
,
\"
colType
\"
:5}"
,
"{
\"
type
\"
:
\"
alter
\"
,
\"
tableName
\"
:
\"
st1
\"
,
\"
tableType
\"
:
\"
super
\"
,
\"
alterType
\"
:7,
\"
colName
\"
:
\"
c3
\"
,
\"
colType
\"
:8,
\"
colLength
\"
:64}"
,
"{
\"
type
\"
:
\"
alter
\"
,
\"
tableName
\"
:
\"
st1
\"
,
\"
tableType
\"
:
\"
super
\"
,
\"
alterType
\"
:1,
\"
colName
\"
:
\"
t2
\"
,
\"
colType
\"
:8,
\"
colLength
\"
:64}"
,
"{
\"
type
\"
:
\"
alter
\"
,
\"
tableName
\"
:
\"
ct3
\"
,
\"
tableType
\"
:
\"
child
\"
,
\"
alterType
\"
:4,
\"
colName
\"
:
\"
t1
\"
,
\"
colValue
\"
:
\"
5000
\"
,
\"
colValueNull
\"
:false}"
,
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
n1
\"
,
\"
tableType
\"
:
\"
normal
\"
,
\"
columns
\"
:[{
\"
name
\"
:
\"
ts
\"
,
\"
type
\"
:9},{
\"
name
\"
:
\"
c1
\"
,
\"
type
\"
:4},{
\"
name
\"
:
\"
c2
\"
,
\"
type
\"
:10,
\"
length
\"
:4}],
\"
tags
\"
:[]}"
,
"{
\"
type
\"
:
\"
alter
\"
,
\"
tableName
\"
:
\"
n1
\"
,
\"
tableType
\"
:
\"
normal
\"
,
\"
alterType
\"
:5,
\"
colName
\"
:
\"
c3
\"
,
\"
colType
\"
:5}"
,
"{
\"
type
\"
:
\"
alter
\"
,
\"
tableName
\"
:
\"
n1
\"
,
\"
tableType
\"
:
\"
normal
\"
,
\"
alterType
\"
:7,
\"
colName
\"
:
\"
c2
\"
,
\"
colType
\"
:10,
\"
colLength
\"
:8}"
,
"{
\"
type
\"
:
\"
alter
\"
,
\"
tableName
\"
:
\"
n1
\"
,
\"
tableType
\"
:
\"
normal
\"
,
\"
alterType
\"
:10,
\"
colName
\"
:
\"
c3
\"
,
\"
colNewName
\"
:
\"
cc3
\"
}"
,
"{
\"
type
\"
:
\"
alter
\"
,
\"
tableName
\"
:
\"
n1
\"
,
\"
tableType
\"
:
\"
normal
\"
,
\"
alterType
\"
:9}"
,
"{
\"
type
\"
:
\"
alter
\"
,
\"
tableName
\"
:
\"
n1
\"
,
\"
tableType
\"
:
\"
normal
\"
,
\"
alterType
\"
:6,
\"
colName
\"
:
\"
c1
\"
}"
,
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
jt
\"
,
\"
tableType
\"
:
\"
super
\"
,
\"
columns
\"
:[{
\"
name
\"
:
\"
ts
\"
,
\"
type
\"
:9},{
\"
name
\"
:
\"
i
\"
,
\"
type
\"
:4}],
\"
tags
\"
:[{
\"
name
\"
:
\"
t
\"
,
\"
type
\"
:15}]}"
,
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
jt1
\"
,
\"
tableType
\"
:
\"
child
\"
,
\"
using
\"
:
\"
jt
\"
,
\"
tagNum
\"
:1,
\"
tags
\"
:[{
\"
name
\"
:
\"
t
\"
,
\"
type
\"
:15,
\"
value
\"
:
\"
{
\\\"
k1
\\\"
:1,
\\\"
k2
\\\"
:
\\\"
hello
\\\"
}
\"
}]}"
,
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
jt2
\"
,
\"
tableType
\"
:
\"
child
\"
,
\"
using
\"
:
\"
jt
\"
,
\"
tagNum
\"
:1,
\"
tags
\"
:[]}"
};
for
(
int
i
=
0
;
i
<
sizeof
(
result
)
/
sizeof
(
result
[
0
]);
i
++
){
taosFprintfFile
(
pFile2
,
result
[
i
]);
taosFprintfFile
(
pFile2
,
"
\n
"
);
if
(
conf
->
snapShot
){
char
*
result
[]
=
{
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
st1
\"
,
\"
tableType
\"
:
\"
super
\"
,
\"
columns
\"
:[{
\"
name
\"
:
\"
ts
\"
,
\"
type
\"
:9},{
\"
name
\"
:
\"
c1
\"
,
\"
type
\"
:4},{
\"
name
\"
:
\"
c2
\"
,
\"
type
\"
:6},{
\"
name
\"
:
\"
c3
\"
,
\"
type
\"
:8,
\"
length
\"
:64},{
\"
name
\"
:
\"
c4
\"
,
\"
type
\"
:5}],
\"
tags
\"
:[{
\"
name
\"
:
\"
t1
\"
,
\"
type
\"
:4},{
\"
name
\"
:
\"
t3
\"
,
\"
type
\"
:10,
\"
length
\"
:8},{
\"
name
\"
:
\"
t4
\"
,
\"
type
\"
:1},{
\"
name
\"
:
\"
t2
\"
,
\"
type
\"
:8,
\"
length
\"
:64}]}"
,
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
ct0
\"
,
\"
tableType
\"
:
\"
child
\"
,
\"
using
\"
:
\"
st1
\"
,
\"
tagNum
\"
:4,
\"
tags
\"
:[{
\"
name
\"
:
\"
t1
\"
,
\"
type
\"
:4,
\"
value
\"
:1000},{
\"
name
\"
:
\"
t3
\"
,
\"
type
\"
:10,
\"
value
\"
:
\"\\\"
ttt
\\\"\"
},{
\"
name
\"
:
\"
t4
\"
,
\"
type
\"
:1,
\"
value
\"
:1}]}"
,
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
ct1
\"
,
\"
tableType
\"
:
\"
child
\"
,
\"
using
\"
:
\"
st1
\"
,
\"
tagNum
\"
:4,
\"
tags
\"
:[{
\"
name
\"
:
\"
t1
\"
,
\"
type
\"
:4,
\"
value
\"
:2000}]}"
,
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
ct2
\"
,
\"
tableType
\"
:
\"
child
\"
,
\"
using
\"
:
\"
st1
\"
,
\"
tagNum
\"
:4,
\"
tags
\"
:[]}"
,
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
ct3
\"
,
\"
tableType
\"
:
\"
child
\"
,
\"
using
\"
:
\"
st1
\"
,
\"
tagNum
\"
:4,
\"
tags
\"
:[{
\"
name
\"
:
\"
t1
\"
,
\"
type
\"
:4,
\"
value
\"
:5000}]}"
,
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
n1
\"
,
\"
tableType
\"
:
\"
normal
\"
,
\"
columns
\"
:[{
\"
name
\"
:
\"
ts
\"
,
\"
type
\"
:9},{
\"
name
\"
:
\"
c2
\"
,
\"
type
\"
:10,
\"
length
\"
:8},{
\"
name
\"
:
\"
cc3
\"
,
\"
type
\"
:5}],
\"
tags
\"
:[]}"
,
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
jt
\"
,
\"
tableType
\"
:
\"
super
\"
,
\"
columns
\"
:[{
\"
name
\"
:
\"
ts
\"
,
\"
type
\"
:9},{
\"
name
\"
:
\"
i
\"
,
\"
type
\"
:4}],
\"
tags
\"
:[{
\"
name
\"
:
\"
t
\"
,
\"
type
\"
:15}]}"
,
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
jt1
\"
,
\"
tableType
\"
:
\"
child
\"
,
\"
using
\"
:
\"
jt
\"
,
\"
tagNum
\"
:1,
\"
tags
\"
:[{
\"
name
\"
:
\"
t
\"
,
\"
type
\"
:15,
\"
value
\"
:
\"
{
\\\"
k1
\\\"
:1,
\\\"
k2
\\\"
:
\\\"
hello
\\\"
}
\"
}]}"
,
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
jt2
\"
,
\"
tableType
\"
:
\"
child
\"
,
\"
using
\"
:
\"
jt
\"
,
\"
tagNum
\"
:1,
\"
tags
\"
:[]}"
,
};
for
(
int
i
=
0
;
i
<
sizeof
(
result
)
/
sizeof
(
result
[
0
]);
i
++
){
taosFprintfFile
(
pFile2
,
result
[
i
]);
taosFprintfFile
(
pFile2
,
"
\n
"
);
}
}
else
{
char
*
result
[]
=
{
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
st1
\"
,
\"
tableType
\"
:
\"
super
\"
,
\"
columns
\"
:[{
\"
name
\"
:
\"
ts
\"
,
\"
type
\"
:9},{
\"
name
\"
:
\"
c1
\"
,
\"
type
\"
:4},{
\"
name
\"
:
\"
c2
\"
,
\"
type
\"
:6},{
\"
name
\"
:
\"
c3
\"
,
\"
type
\"
:8,
\"
length
\"
:16}],
\"
tags
\"
:[{
\"
name
\"
:
\"
t1
\"
,
\"
type
\"
:4},{
\"
name
\"
:
\"
t3
\"
,
\"
type
\"
:10,
\"
length
\"
:8},{
\"
name
\"
:
\"
t4
\"
,
\"
type
\"
:1}]}"
,
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
ct0
\"
,
\"
tableType
\"
:
\"
child
\"
,
\"
using
\"
:
\"
st1
\"
,
\"
tagNum
\"
:3,
\"
tags
\"
:[{
\"
name
\"
:
\"
t1
\"
,
\"
type
\"
:4,
\"
value
\"
:1000},{
\"
name
\"
:
\"
t3
\"
,
\"
type
\"
:10,
\"
value
\"
:
\"\\\"
ttt
\\\"\"
},{
\"
name
\"
:
\"
t4
\"
,
\"
type
\"
:1,
\"
value
\"
:1}]}"
,
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
ct1
\"
,
\"
tableType
\"
:
\"
child
\"
,
\"
using
\"
:
\"
st1
\"
,
\"
tagNum
\"
:3,
\"
tags
\"
:[{
\"
name
\"
:
\"
t1
\"
,
\"
type
\"
:4,
\"
value
\"
:2000}]}"
,
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
ct2
\"
,
\"
tableType
\"
:
\"
child
\"
,
\"
using
\"
:
\"
st1
\"
,
\"
tagNum
\"
:3,
\"
tags
\"
:[]}"
,
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
ct3
\"
,
\"
tableType
\"
:
\"
child
\"
,
\"
using
\"
:
\"
st1
\"
,
\"
tagNum
\"
:3,
\"
tags
\"
:[{
\"
name
\"
:
\"
t1
\"
,
\"
type
\"
:4,
\"
value
\"
:3000}]}"
,
"{
\"
type
\"
:
\"
alter
\"
,
\"
tableName
\"
:
\"
st1
\"
,
\"
tableType
\"
:
\"
super
\"
,
\"
alterType
\"
:5,
\"
colName
\"
:
\"
c4
\"
,
\"
colType
\"
:5}"
,
"{
\"
type
\"
:
\"
alter
\"
,
\"
tableName
\"
:
\"
st1
\"
,
\"
tableType
\"
:
\"
super
\"
,
\"
alterType
\"
:7,
\"
colName
\"
:
\"
c3
\"
,
\"
colType
\"
:8,
\"
colLength
\"
:64}"
,
"{
\"
type
\"
:
\"
alter
\"
,
\"
tableName
\"
:
\"
st1
\"
,
\"
tableType
\"
:
\"
super
\"
,
\"
alterType
\"
:1,
\"
colName
\"
:
\"
t2
\"
,
\"
colType
\"
:8,
\"
colLength
\"
:64}"
,
"{
\"
type
\"
:
\"
alter
\"
,
\"
tableName
\"
:
\"
ct3
\"
,
\"
tableType
\"
:
\"
child
\"
,
\"
alterType
\"
:4,
\"
colName
\"
:
\"
t1
\"
,
\"
colValue
\"
:
\"
5000
\"
,
\"
colValueNull
\"
:false}"
,
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
n1
\"
,
\"
tableType
\"
:
\"
normal
\"
,
\"
columns
\"
:[{
\"
name
\"
:
\"
ts
\"
,
\"
type
\"
:9},{
\"
name
\"
:
\"
c1
\"
,
\"
type
\"
:4},{
\"
name
\"
:
\"
c2
\"
,
\"
type
\"
:10,
\"
length
\"
:4}],
\"
tags
\"
:[]}"
,
"{
\"
type
\"
:
\"
alter
\"
,
\"
tableName
\"
:
\"
n1
\"
,
\"
tableType
\"
:
\"
normal
\"
,
\"
alterType
\"
:5,
\"
colName
\"
:
\"
c3
\"
,
\"
colType
\"
:5}"
,
"{
\"
type
\"
:
\"
alter
\"
,
\"
tableName
\"
:
\"
n1
\"
,
\"
tableType
\"
:
\"
normal
\"
,
\"
alterType
\"
:7,
\"
colName
\"
:
\"
c2
\"
,
\"
colType
\"
:10,
\"
colLength
\"
:8}"
,
"{
\"
type
\"
:
\"
alter
\"
,
\"
tableName
\"
:
\"
n1
\"
,
\"
tableType
\"
:
\"
normal
\"
,
\"
alterType
\"
:10,
\"
colName
\"
:
\"
c3
\"
,
\"
colNewName
\"
:
\"
cc3
\"
}"
,
"{
\"
type
\"
:
\"
alter
\"
,
\"
tableName
\"
:
\"
n1
\"
,
\"
tableType
\"
:
\"
normal
\"
,
\"
alterType
\"
:9}"
,
"{
\"
type
\"
:
\"
alter
\"
,
\"
tableName
\"
:
\"
n1
\"
,
\"
tableType
\"
:
\"
normal
\"
,
\"
alterType
\"
:6,
\"
colName
\"
:
\"
c1
\"
}"
,
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
jt
\"
,
\"
tableType
\"
:
\"
super
\"
,
\"
columns
\"
:[{
\"
name
\"
:
\"
ts
\"
,
\"
type
\"
:9},{
\"
name
\"
:
\"
i
\"
,
\"
type
\"
:4}],
\"
tags
\"
:[{
\"
name
\"
:
\"
t
\"
,
\"
type
\"
:15}]}"
,
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
jt1
\"
,
\"
tableType
\"
:
\"
child
\"
,
\"
using
\"
:
\"
jt
\"
,
\"
tagNum
\"
:1,
\"
tags
\"
:[{
\"
name
\"
:
\"
t
\"
,
\"
type
\"
:15,
\"
value
\"
:
\"
{
\\\"
k1
\\\"
:1,
\\\"
k2
\\\"
:
\\\"
hello
\\\"
}
\"
}]}"
,
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
jt2
\"
,
\"
tableType
\"
:
\"
child
\"
,
\"
using
\"
:
\"
jt
\"
,
\"
tagNum
\"
:1,
\"
tags
\"
:[]}"
};
for
(
int
i
=
0
;
i
<
sizeof
(
result
)
/
sizeof
(
result
[
0
]);
i
++
){
taosFprintfFile
(
pFile2
,
result
[
i
]);
taosFprintfFile
(
pFile2
,
"
\n
"
);
}
}
taosCloseFile
(
&
pFile2
);
}
int
main
(
int
argc
,
char
*
argv
[])
{
if
(
argc
==
3
&&
strcmp
(
argv
[
1
],
"-c"
)
==
0
)
{
strcpy
(
dir
,
argv
[
2
]);
}
else
{
// strcpy(dir, "../../../sim/psim/cfg");
strcpy
(
dir
,
"/var/log"
);
for
(
int32_t
i
=
1
;
i
<
argc
;
i
++
)
{
if
(
strcmp
(
argv
[
i
],
"-c"
)
==
0
){
strcpy
(
g_conf
.
dir
,
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-s"
)
==
0
){
g_conf
.
snapShot
=
true
;
}
else
if
(
strcmp
(
argv
[
i
],
"-d"
)
==
0
){
g_conf
.
dropTable
=
true
;
}
else
if
(
strcmp
(
argv
[
i
],
"-sv"
)
==
0
){
g_conf
.
srcVgroups
=
atol
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-dv"
)
==
0
){
g_conf
.
dstVgroups
=
atol
(
argv
[
++
i
]);
}
}
printf
(
"env init
\n
"
);
initLogFile
();
if
(
strlen
(
g_conf
.
dir
)
!=
0
){
initLogFile
(
&
g_conf
);
}
if
(
init_env
()
<
0
)
{
if
(
init_env
(
&
g_conf
)
<
0
)
{
return
-
1
;
}
create_topic
();
tmq_t
*
tmq
=
build_consumer
();
tmq_t
*
tmq
=
build_consumer
(
&
g_conf
);
tmq_list_t
*
topic_list
=
build_topic_list
();
basic_consume_loop
(
tmq
,
topic_list
);
/*sync_consume_loop(tmq, topic_list);*/
taosCloseFile
(
&
g_fp
);
}
tests/test/c/tmq_taosx_snapshot_ci.c
已删除
100644 → 0
浏览文件 @
1238365c
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include "taos.h"
#include "types.h"
static
int
running
=
1
;
TdFilePtr
g_fp
=
NULL
;
char
dir
[
64
]
=
{
0
};
static
TAOS
*
use_db
(){
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
if
(
pConn
==
NULL
)
{
return
NULL
;
}
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"use db_taosx"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in use db_taosx, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
NULL
;
}
taos_free_result
(
pRes
);
return
pConn
;
}
static
void
msg_process
(
TAOS_RES
*
msg
)
{
/*memset(buf, 0, 1024);*/
printf
(
"-----------topic-------------: %s
\n
"
,
tmq_get_topic_name
(
msg
));
printf
(
"db: %s
\n
"
,
tmq_get_db_name
(
msg
));
printf
(
"vg: %d
\n
"
,
tmq_get_vgroup_id
(
msg
));
TAOS
*
pConn
=
use_db
();
if
(
tmq_get_res_type
(
msg
)
==
TMQ_RES_TABLE_META
)
{
char
*
result
=
tmq_get_json_meta
(
msg
);
if
(
result
)
{
printf
(
"meta result: %s
\n
"
,
result
);
}
taosFprintfFile
(
g_fp
,
result
);
taosFprintfFile
(
g_fp
,
"
\n
"
);
tmq_free_json_meta
(
result
);
}
tmq_raw_data
raw
=
{
0
};
tmq_get_raw
(
msg
,
&
raw
);
int32_t
ret
=
tmq_write_raw
(
pConn
,
raw
);
printf
(
"write raw data: %s
\n
"
,
tmq_err2str
(
ret
));
// else{
// while(1){
// int numOfRows = 0;
// void *pData = NULL;
// taos_fetch_raw_block(msg, &numOfRows, &pData);
// if(numOfRows == 0) break;
// printf("write data: tbname:%s, numOfRows:%d\n", tmq_get_table_name(msg), numOfRows);
// int ret = taos_write_raw_block(pConn, numOfRows, pData, tmq_get_table_name(msg));
// printf("write raw data: %s\n", tmq_err2str(ret));
// }
// }
taos_close
(
pConn
);
}
int32_t
init_env
()
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
if
(
pConn
==
NULL
)
{
return
-
1
;
}
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"drop database if exists db_taosx"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in drop db_taosx, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create database if not exists db_taosx vgroups 1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in create db_taosx, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"drop database if exists abc1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in drop db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create database if not exists abc1 vgroups 1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in create db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"use abc1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in use db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
"nchar(8), t4 bool)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create super table st1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table if not exists ct0 using st1 tags(1000,
\"
ttt
\"
, true)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create child table tu1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into ct0 values(1626006833600, 1, 2, 'a')"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into ct0, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table if not exists ct1 using st1(t1) tags(2000)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create child table ct1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table if not exists ct2 using st1(t1) tags(NULL)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create child table ct2, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into ct1 values(1626006833600, 3, 4, 'b')"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into ct1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table if not exists ct3 using st1(t1) tags(3000)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create child table ct3, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into ct3 values(1626006833600, 5, 6, 'c') ct1 values(1626006833601, 2, 3, 'sds') (1626006833602, 4, 5, 'ddd') ct0 values(1626006833602, 4, 3, 'hwj') ct1 values(now+5s, 23, 32, 's21ds')"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into ct3, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"alter table st1 add column c4 bigint"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to alter super table st1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"alter table st1 modify column c3 binary(64)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to alter super table st1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into ct3 values(1626006833605, 53, 63, 'cffffffffffffffffffffffffffff', 8989898899999) (1626006833609, 51, 62, 'c333', 940)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into ct3, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into ct3 select * from ct1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into ct3, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"alter table st1 add tag t2 binary(64)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to alter super table st1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"alter table ct3 set tag t1=5000"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to slter child table ct3, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"delete from abc1 .ct3 where ts < 1626006833606"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into ct3, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table if not exists n1(ts timestamp, c1 int, c2 nchar(4))"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create normal table n1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"alter table n1 add column c3 bigint"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to alter normal table n1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"alter table n1 modify column c2 nchar(8)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to alter normal table n1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"alter table n1 rename column c3 cc3"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to alter normal table n1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"alter table n1 comment 'hello'"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to alter normal table n1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"alter table n1 drop column c1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to alter normal table n1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into n1 values(now, 'eeee', 8989898899999) (now+9s, 'c333', 940)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to insert into n1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table jt(ts timestamp, i int) tags(t json)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create super table jt, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table jt1 using jt tags('{
\"
k1
\"
:1,
\"
k2
\"
:
\"
hello
\"
}')"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create super table jt, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table jt2 using jt tags('')"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create super table jt2, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into jt1 values(now, 1)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create super table jt1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"insert into jt2 values(now, 11)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create super table jt2, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
taos_close
(
pConn
);
return
0
;
}
int32_t
create_topic
()
{
printf
(
"create topic
\n
"
);
TAOS_RES
*
pRes
;
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
if
(
pConn
==
NULL
)
{
return
-
1
;
}
pRes
=
taos_query
(
pConn
,
"use abc1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in use db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create topic topic_ctb_column with meta as database abc1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create topic topic_ctb_column, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
taos_close
(
pConn
);
return
0
;
}
void
tmq_commit_cb_print
(
tmq_t
*
tmq
,
int32_t
code
,
void
*
param
)
{
printf
(
"commit %d tmq %p param %p
\n
"
,
code
,
tmq
,
param
);
}
tmq_t
*
build_consumer
()
{
#if 0
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
#endif
tmq_conf_t
*
conf
=
tmq_conf_new
();
tmq_conf_set
(
conf
,
"group.id"
,
"tg2"
);
tmq_conf_set
(
conf
,
"client.id"
,
"my app 1"
);
tmq_conf_set
(
conf
,
"td.connect.user"
,
"root"
);
tmq_conf_set
(
conf
,
"td.connect.pass"
,
"taosdata"
);
tmq_conf_set
(
conf
,
"msg.with.table.name"
,
"true"
);
tmq_conf_set
(
conf
,
"enable.auto.commit"
,
"true"
);
tmq_conf_set
(
conf
,
"enable.heartbeat.background"
,
"true"
);
tmq_conf_set
(
conf
,
"experimental.snapshot.enable"
,
"true"
);
/*tmq_conf_set(conf, "experimental.snapshot.enable", "true");*/
tmq_conf_set_auto_commit_cb
(
conf
,
tmq_commit_cb_print
,
NULL
);
tmq_t
*
tmq
=
tmq_consumer_new
(
conf
,
NULL
,
0
);
assert
(
tmq
);
tmq_conf_destroy
(
conf
);
return
tmq
;
}
tmq_list_t
*
build_topic_list
()
{
tmq_list_t
*
topic_list
=
tmq_list_new
();
tmq_list_append
(
topic_list
,
"topic_ctb_column"
);
/*tmq_list_append(topic_list, "tmq_test_db_multi_insert_topic");*/
return
topic_list
;
}
void
basic_consume_loop
(
tmq_t
*
tmq
,
tmq_list_t
*
topics
)
{
int32_t
code
;
if
((
code
=
tmq_subscribe
(
tmq
,
topics
)))
{
fprintf
(
stderr
,
"%% Failed to start consuming topics: %s
\n
"
,
tmq_err2str
(
code
));
printf
(
"subscribe err
\n
"
);
return
;
}
int32_t
cnt
=
0
;
while
(
running
)
{
TAOS_RES
*
tmqmessage
=
tmq_consumer_poll
(
tmq
,
1000
);
if
(
tmqmessage
)
{
cnt
++
;
msg_process
(
tmqmessage
);
/*if (cnt >= 2) break;*/
/*printf("get data\n");*/
taos_free_result
(
tmqmessage
);
/*} else {*/
/*break;*/
/*tmq_commit_sync(tmq, NULL);*/
}
else
{
break
;
}
}
code
=
tmq_consumer_close
(
tmq
);
if
(
code
)
fprintf
(
stderr
,
"%% Failed to close consumer: %s
\n
"
,
tmq_err2str
(
code
));
else
fprintf
(
stderr
,
"%% Consumer closed
\n
"
);
}
void
sync_consume_loop
(
tmq_t
*
tmq
,
tmq_list_t
*
topics
)
{
static
const
int
MIN_COMMIT_COUNT
=
1
;
int
msg_count
=
0
;
int32_t
code
;
if
((
code
=
tmq_subscribe
(
tmq
,
topics
)))
{
fprintf
(
stderr
,
"%% Failed to start consuming topics: %s
\n
"
,
tmq_err2str
(
code
));
return
;
}
tmq_list_t
*
subList
=
NULL
;
tmq_subscription
(
tmq
,
&
subList
);
char
**
subTopics
=
tmq_list_to_c_array
(
subList
);
int32_t
sz
=
tmq_list_get_size
(
subList
);
printf
(
"subscribed topics: "
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
printf
(
"%s, "
,
subTopics
[
i
]);
}
printf
(
"
\n
"
);
tmq_list_destroy
(
subList
);
while
(
running
)
{
TAOS_RES
*
tmqmessage
=
tmq_consumer_poll
(
tmq
,
1000
);
if
(
tmqmessage
)
{
msg_process
(
tmqmessage
);
taos_free_result
(
tmqmessage
);
/*tmq_commit_sync(tmq, NULL);*/
/*if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);*/
}
}
code
=
tmq_consumer_close
(
tmq
);
if
(
code
)
fprintf
(
stderr
,
"%% Failed to close consumer: %s
\n
"
,
tmq_err2str
(
code
));
else
fprintf
(
stderr
,
"%% Consumer closed
\n
"
);
}
void
initLogFile
()
{
char
f1
[
256
]
=
{
0
};
char
f2
[
256
]
=
{
0
};
sprintf
(
f1
,
"%s/../log/tmq_taosx_tmp_snapshot.source"
,
dir
);
sprintf
(
f2
,
"%s/../log/tmq_taosx_tmp_snapshot.result"
,
dir
);
TdFilePtr
pFile
=
taosOpenFile
(
f1
,
TD_FILE_TEXT
|
TD_FILE_TRUNC
|
TD_FILE_STREAM
);
if
(
NULL
==
pFile
)
{
fprintf
(
stderr
,
"Failed to open %s for save result
\n
"
,
f1
);
exit
(
-
1
);
}
g_fp
=
pFile
;
TdFilePtr
pFile2
=
taosOpenFile
(
f2
,
TD_FILE_TEXT
|
TD_FILE_TRUNC
|
TD_FILE_STREAM
);
if
(
NULL
==
pFile2
)
{
fprintf
(
stderr
,
"Failed to open %s for save result
\n
"
,
f2
);
exit
(
-
1
);
}
char
*
result
[]
=
{
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
st1
\"
,
\"
tableType
\"
:
\"
super
\"
,
\"
columns
\"
:[{
\"
name
\"
:
\"
ts
\"
,
\"
type
\"
:9},{
\"
name
\"
:
\"
c1
\"
,
\"
type
\"
:4},{
\"
name
\"
:
\"
c2
\"
,
\"
type
\"
:6},{
\"
name
\"
:
\"
c3
\"
,
\"
type
\"
:8,
\"
length
\"
:64},{
\"
name
\"
:
\"
c4
\"
,
\"
type
\"
:5}],
\"
tags
\"
:[{
\"
name
\"
:
\"
t1
\"
,
\"
type
\"
:4},{
\"
name
\"
:
\"
t3
\"
,
\"
type
\"
:10,
\"
length
\"
:8},{
\"
name
\"
:
\"
t4
\"
,
\"
type
\"
:1},{
\"
name
\"
:
\"
t2
\"
,
\"
type
\"
:8,
\"
length
\"
:64}]}"
,
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
ct0
\"
,
\"
tableType
\"
:
\"
child
\"
,
\"
using
\"
:
\"
st1
\"
,
\"
tagNum
\"
:4,
\"
tags
\"
:[{
\"
name
\"
:
\"
t1
\"
,
\"
type
\"
:4,
\"
value
\"
:1000},{
\"
name
\"
:
\"
t3
\"
,
\"
type
\"
:10,
\"
value
\"
:
\"\\\"
ttt
\\\"\"
},{
\"
name
\"
:
\"
t4
\"
,
\"
type
\"
:1,
\"
value
\"
:1}]}"
,
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
ct1
\"
,
\"
tableType
\"
:
\"
child
\"
,
\"
using
\"
:
\"
st1
\"
,
\"
tagNum
\"
:4,
\"
tags
\"
:[{
\"
name
\"
:
\"
t1
\"
,
\"
type
\"
:4,
\"
value
\"
:2000}]}"
,
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
ct2
\"
,
\"
tableType
\"
:
\"
child
\"
,
\"
using
\"
:
\"
st1
\"
,
\"
tagNum
\"
:4,
\"
tags
\"
:[]}"
,
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
ct3
\"
,
\"
tableType
\"
:
\"
child
\"
,
\"
using
\"
:
\"
st1
\"
,
\"
tagNum
\"
:4,
\"
tags
\"
:[{
\"
name
\"
:
\"
t1
\"
,
\"
type
\"
:4,
\"
value
\"
:5000}]}"
,
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
n1
\"
,
\"
tableType
\"
:
\"
normal
\"
,
\"
columns
\"
:[{
\"
name
\"
:
\"
ts
\"
,
\"
type
\"
:9},{
\"
name
\"
:
\"
c2
\"
,
\"
type
\"
:10,
\"
length
\"
:8},{
\"
name
\"
:
\"
cc3
\"
,
\"
type
\"
:5}],
\"
tags
\"
:[]}"
,
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
jt
\"
,
\"
tableType
\"
:
\"
super
\"
,
\"
columns
\"
:[{
\"
name
\"
:
\"
ts
\"
,
\"
type
\"
:9},{
\"
name
\"
:
\"
i
\"
,
\"
type
\"
:4}],
\"
tags
\"
:[{
\"
name
\"
:
\"
t
\"
,
\"
type
\"
:15}]}"
,
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
jt1
\"
,
\"
tableType
\"
:
\"
child
\"
,
\"
using
\"
:
\"
jt
\"
,
\"
tagNum
\"
:1,
\"
tags
\"
:[{
\"
name
\"
:
\"
t
\"
,
\"
type
\"
:15,
\"
value
\"
:
\"
{
\\\"
k1
\\\"
:1,
\\\"
k2
\\\"
:
\\\"
hello
\\\"
}
\"
}]}"
,
"{
\"
type
\"
:
\"
create
\"
,
\"
tableName
\"
:
\"
jt2
\"
,
\"
tableType
\"
:
\"
child
\"
,
\"
using
\"
:
\"
jt
\"
,
\"
tagNum
\"
:1,
\"
tags
\"
:[]}"
,
};
for
(
int
i
=
0
;
i
<
sizeof
(
result
)
/
sizeof
(
result
[
0
]);
i
++
){
taosFprintfFile
(
pFile2
,
result
[
i
]);
taosFprintfFile
(
pFile2
,
"
\n
"
);
}
taosCloseFile
(
&
pFile2
);
}
int
main
(
int
argc
,
char
*
argv
[])
{
if
(
argc
==
3
&&
strcmp
(
argv
[
1
],
"-c"
)
==
0
)
{
strcpy
(
dir
,
argv
[
2
]);
}
else
{
// strcpy(dir, "../../../sim/psim/cfg");
strcpy
(
dir
,
"/var/log"
);
}
printf
(
"env init
\n
"
);
initLogFile
();
if
(
init_env
()
<
0
)
{
return
-
1
;
}
create_topic
();
tmq_t
*
tmq
=
build_consumer
();
tmq_list_t
*
topic_list
=
build_topic_list
();
basic_consume_loop
(
tmq
,
topic_list
);
/*sync_consume_loop(tmq, topic_list);*/
taosCloseFile
(
&
g_fp
);
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录