Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
67ecaafb
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
67ecaafb
编写于
5月 07, 2021
作者:
sangshuduo
提交者:
GitHub
5月 07, 2021
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[TD-3992]<fix>: taosdemo subscribe. (#6035)
Co-authored-by:
N
Shuduo Sang
<
sdsang@taosdata.com
>
上级
5cd171b7
变更
1
显示空白变更内容
内联
并排
Showing
1 changed file
with
75 addition
and
64 deletion
+75
-64
src/kit/taosdemo/taosdemo.c
src/kit/taosdemo/taosdemo.c
+75
-64
未找到文件。
src/kit/taosdemo/taosdemo.c
浏览文件 @
67ecaafb
...
@@ -1630,35 +1630,41 @@ static void printfQueryMeta() {
...
@@ -1630,35 +1630,41 @@ static void printfQueryMeta() {
printf
(
"database name:
\033
[33m%s
\033
[0m
\n
"
,
g_queryInfo
.
dbName
);
printf
(
"database name:
\033
[33m%s
\033
[0m
\n
"
,
g_queryInfo
.
dbName
);
printf
(
"
\n
"
);
printf
(
"
\n
"
);
if
((
SUBSCRIBE_TEST
==
g_args
.
test_mode
)
||
(
QUERY_TEST
==
g_args
.
test_mode
))
{
printf
(
"specified table query info:
\n
"
);
printf
(
"specified table query info:
\n
"
);
printf
(
"query interval:
\033
[33m%"
PRIu64
" ms
\033
[0m
\n
"
,
g_queryInfo
.
specifiedQueryInfo
.
queryInterval
);
printf
(
"top query times:
\033
[33m%"
PRIu64
"
\033
[0m
\n
"
,
g_args
.
query_times
);
printf
(
"concurrent:
\033
[33m%"
PRIu64
"
\033
[0m
\n
"
,
g_queryInfo
.
specifiedQueryInfo
.
concurrent
);
printf
(
"sqlCount:
\033
[33m%"
PRIu64
"
\033
[0m
\n
"
,
printf
(
"sqlCount:
\033
[33m%"
PRIu64
"
\033
[0m
\n
"
,
g_queryInfo
.
specifiedQueryInfo
.
sqlCount
);
g_queryInfo
.
specifiedQueryInfo
.
sqlCount
);
if
(
g_queryInfo
.
specifiedQueryInfo
.
sqlCount
>
0
)
{
printf
(
"specified tbl query times:
\n
"
);
printf
(
"specified tbl query times:
\n
"
);
printf
(
"
\033
[33m%"
PRIu64
"
\033
[0m
\n
"
,
printf
(
"
\033
[33m%"
PRIu64
"
\033
[0m
\n
"
,
g_queryInfo
.
specifiedQueryInfo
.
queryTimes
);
g_queryInfo
.
specifiedQueryInfo
.
queryTimes
);
printf
(
"query interval:
\033
[33m%"
PRIu64
" ms
\033
[0m
\n
"
,
if
(
SUBSCRIBE_TEST
==
g_args
.
test_mode
)
{
g_queryInfo
.
specifiedQueryInfo
.
queryInterval
);
printf
(
"mod:
\033
[33m%d
\033
[0m
\n
"
,
printf
(
"top query times:
\033
[33m%"
PRIu64
"
\033
[0m
\n
"
,
g_args
.
query_times
);
g_queryInfo
.
specifiedQueryInfo
.
mode
);
printf
(
"concurrent:
\033
[33m%"
PRIu64
"
\033
[0m
\n
"
,
g_queryInfo
.
specifiedQueryInfo
.
concurrent
);
printf
(
"mod:
\033
[33m%s
\033
[0m
\n
"
,
(
g_queryInfo
.
specifiedQueryInfo
.
mode
)
?
"async"
:
"sync"
);
printf
(
"interval:
\033
[33m%"
PRIu64
"
\033
[0m
\n
"
,
printf
(
"interval:
\033
[33m%"
PRIu64
"
\033
[0m
\n
"
,
g_queryInfo
.
specifiedQueryInfo
.
subscribeInterval
);
g_queryInfo
.
specifiedQueryInfo
.
subscribeInterval
);
printf
(
"restart:
\033
[33m%d
\033
[0m
\n
"
,
printf
(
"restart:
\033
[33m%d
\033
[0m
\n
"
,
g_queryInfo
.
specifiedQueryInfo
.
subscribeRestart
);
g_queryInfo
.
specifiedQueryInfo
.
subscribeRestart
);
printf
(
"keepProgress:
\033
[33m%d
\033
[0m
\n
"
,
printf
(
"keepProgress:
\033
[33m%d
\033
[0m
\n
"
,
g_queryInfo
.
specifiedQueryInfo
.
subscribeKeepProgress
);
g_queryInfo
.
specifiedQueryInfo
.
subscribeKeepProgress
);
}
for
(
uint64_t
i
=
0
;
i
<
g_queryInfo
.
specifiedQueryInfo
.
sqlCount
;
i
++
)
{
for
(
uint64_t
i
=
0
;
i
<
g_queryInfo
.
specifiedQueryInfo
.
sqlCount
;
i
++
)
{
printf
(
" sql[%"
PRIu64
"]:
\033
[33m%s
\033
[0m
\n
"
,
printf
(
" sql[%"
PRIu64
"]:
\033
[33m%s
\033
[0m
\n
"
,
i
,
g_queryInfo
.
specifiedQueryInfo
.
sql
[
i
]);
i
,
g_queryInfo
.
specifiedQueryInfo
.
sql
[
i
]);
}
}
printf
(
"
\n
"
);
printf
(
"
\n
"
);
}
printf
(
"super table query info:
\n
"
);
printf
(
"super table query info:
\n
"
);
printf
(
"sqlCount:
\033
[33m%"
PRIu64
"
\033
[0m
\n
"
,
g_queryInfo
.
superQueryInfo
.
sqlCount
);
if
(
g_queryInfo
.
superQueryInfo
.
sqlCount
>
0
)
{
printf
(
"query interval:
\033
[33m%"
PRIu64
"
\033
[0m
\n
"
,
printf
(
"query interval:
\033
[33m%"
PRIu64
"
\033
[0m
\n
"
,
g_queryInfo
.
superQueryInfo
.
queryInterval
);
g_queryInfo
.
superQueryInfo
.
queryInterval
);
printf
(
"threadCnt:
\033
[33m%d
\033
[0m
\n
"
,
printf
(
"threadCnt:
\033
[33m%d
\033
[0m
\n
"
,
...
@@ -1670,24 +1676,22 @@ static void printfQueryMeta() {
...
@@ -1670,24 +1676,22 @@ static void printfQueryMeta() {
printf
(
"stb query times:
\033
[33m%"
PRIu64
"
\033
[0m
\n
"
,
printf
(
"stb query times:
\033
[33m%"
PRIu64
"
\033
[0m
\n
"
,
g_queryInfo
.
superQueryInfo
.
queryTimes
);
g_queryInfo
.
superQueryInfo
.
queryTimes
);
if
(
SUBSCRIBE_TEST
==
g_args
.
test_mode
)
{
printf
(
"mod:
\033
[33m%s
\033
[0m
\n
"
,
printf
(
"mod:
\033
[33m%d
\033
[0m
\n
"
,
(
g_queryInfo
.
superQueryInfo
.
mode
)
?
"async"
:
"sync"
);
g_queryInfo
.
superQueryInfo
.
mode
);
printf
(
"interval:
\033
[33m%"
PRIu64
"
\033
[0m
\n
"
,
printf
(
"interval:
\033
[33m%"
PRIu64
"
\033
[0m
\n
"
,
g_queryInfo
.
superQueryInfo
.
subscribeInterval
);
g_queryInfo
.
superQueryInfo
.
subscribeInterval
);
printf
(
"restart:
\033
[33m%d
\033
[0m
\n
"
,
printf
(
"restart:
\033
[33m%d
\033
[0m
\n
"
,
g_queryInfo
.
superQueryInfo
.
subscribeRestart
);
g_queryInfo
.
superQueryInfo
.
subscribeRestart
);
printf
(
"keepProgress:
\033
[33m%d
\033
[0m
\n
"
,
printf
(
"keepProgress:
\033
[33m%d
\033
[0m
\n
"
,
g_queryInfo
.
superQueryInfo
.
subscribeKeepProgress
);
g_queryInfo
.
superQueryInfo
.
subscribeKeepProgress
);
}
printf
(
"sqlCount:
\033
[33m%"
PRIu64
"
\033
[0m
\n
"
,
g_queryInfo
.
superQueryInfo
.
sqlCount
);
for
(
int
i
=
0
;
i
<
g_queryInfo
.
superQueryInfo
.
sqlCount
;
i
++
)
{
for
(
int
i
=
0
;
i
<
g_queryInfo
.
superQueryInfo
.
sqlCount
;
i
++
)
{
printf
(
" sql[%d]:
\033
[33m%s
\033
[0m
\n
"
,
printf
(
" sql[%d]:
\033
[33m%s
\033
[0m
\n
"
,
i
,
g_queryInfo
.
superQueryInfo
.
sql
[
i
]);
i
,
g_queryInfo
.
superQueryInfo
.
sql
[
i
]);
}
}
printf
(
"
\n
"
);
printf
(
"
\n
"
);
}
}
SHOW_PARSE_RESULT_END
();
SHOW_PARSE_RESULT_END
();
}
}
...
@@ -2847,7 +2851,7 @@ static void* createTable(void *sarg)
...
@@ -2847,7 +2851,7 @@ static void* createTable(void *sarg)
}
}
static
int
startMultiThreadCreateChildTable
(
static
int
startMultiThreadCreateChildTable
(
char
*
cols
,
int
threads
,
int64_t
startFrom
,
int64_t
ntables
,
char
*
cols
,
int
threads
,
uint64_t
startFrom
,
u
int64_t
ntables
,
char
*
db_name
,
SSuperTable
*
superTblInfo
)
{
char
*
db_name
,
SSuperTable
*
superTblInfo
)
{
pthread_t
*
pids
=
malloc
(
threads
*
sizeof
(
pthread_t
));
pthread_t
*
pids
=
malloc
(
threads
*
sizeof
(
pthread_t
));
...
@@ -2862,13 +2866,13 @@ static int startMultiThreadCreateChildTable(
...
@@ -2862,13 +2866,13 @@ static int startMultiThreadCreateChildTable(
threads
=
1
;
threads
=
1
;
}
}
int64_t
a
=
ntables
/
threads
;
u
int64_t
a
=
ntables
/
threads
;
if
(
a
<
1
)
{
if
(
a
<
1
)
{
threads
=
ntables
;
threads
=
ntables
;
a
=
1
;
a
=
1
;
}
}
int64_t
b
=
0
;
u
int64_t
b
=
0
;
b
=
ntables
%
threads
;
b
=
ntables
%
threads
;
for
(
int64_t
i
=
0
;
i
<
threads
;
i
++
)
{
for
(
int64_t
i
=
0
;
i
<
threads
;
i
++
)
{
...
@@ -4212,7 +4216,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
...
@@ -4212,7 +4216,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
}
}
}
}
// su
b
_table_query
// su
per
_table_query
cJSON
*
superQuery
=
cJSON_GetObjectItem
(
root
,
"super_table_query"
);
cJSON
*
superQuery
=
cJSON_GetObjectItem
(
root
,
"super_table_query"
);
if
(
!
superQuery
)
{
if
(
!
superQuery
)
{
g_queryInfo
.
superQueryInfo
.
threadCnt
=
1
;
g_queryInfo
.
superQueryInfo
.
threadCnt
=
1
;
...
@@ -5679,13 +5683,13 @@ static void startMultiThreadInsertData(int threads, char* db_name,
...
@@ -5679,13 +5683,13 @@ static void startMultiThreadInsertData(int threads, char* db_name,
taos_close
(
taos
);
taos_close
(
taos
);
in
t
a
=
ntables
/
threads
;
uint64_
t
a
=
ntables
/
threads
;
if
(
a
<
1
)
{
if
(
a
<
1
)
{
threads
=
ntables
;
threads
=
ntables
;
a
=
1
;
a
=
1
;
}
}
in
t
b
=
0
;
uint64_
t
b
=
0
;
if
(
threads
!=
0
)
{
if
(
threads
!=
0
)
{
b
=
ntables
%
threads
;
b
=
ntables
%
threads
;
}
}
...
@@ -6380,7 +6384,7 @@ static int queryTestProcess() {
...
@@ -6380,7 +6384,7 @@ static int queryTestProcess() {
b
=
ntables
%
threads
;
b
=
ntables
%
threads
;
}
}
in
t
startFrom
=
0
;
uint64_
t
startFrom
=
0
;
for
(
int
i
=
0
;
i
<
threads
;
i
++
)
{
for
(
int
i
=
0
;
i
<
threads
;
i
++
)
{
threadInfo
*
t_info
=
infosOfSub
+
i
;
threadInfo
*
t_info
=
infosOfSub
+
i
;
t_info
->
threadID
=
i
;
t_info
->
threadID
=
i
;
...
@@ -6436,13 +6440,14 @@ static void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int c
...
@@ -6436,13 +6440,14 @@ static void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int c
}
}
getResult
(
res
,
(
char
*
)
param
);
getResult
(
res
,
(
char
*
)
param
);
taos_free_result
(
res
);
// tao_unscribe() will free result.
}
}
static
TAOS_SUB
*
subscribeImpl
(
TAOS
*
taos
,
char
*
sql
,
char
*
topic
,
char
*
resultFileName
)
{
static
TAOS_SUB
*
subscribeImpl
(
TAOS
*
taos
,
char
*
sql
,
char
*
topic
,
char
*
resultFileName
)
{
TAOS_SUB
*
tsub
=
NULL
;
TAOS_SUB
*
tsub
=
NULL
;
if
(
g_queryInfo
.
specifiedQueryInfo
.
mode
)
{
if
(
ASYNC_QUERY_MODE
==
g_queryInfo
.
specifiedQueryInfo
.
mode
)
{
tsub
=
taos_subscribe
(
taos
,
tsub
=
taos_subscribe
(
taos
,
g_queryInfo
.
specifiedQueryInfo
.
subscribeRestart
,
g_queryInfo
.
specifiedQueryInfo
.
subscribeRestart
,
topic
,
sql
,
subscribe_callback
,
(
void
*
)
resultFileName
,
topic
,
sql
,
subscribe_callback
,
(
void
*
)
resultFileName
,
...
@@ -6466,6 +6471,9 @@ static void *superSubscribe(void *sarg) {
...
@@ -6466,6 +6471,9 @@ static void *superSubscribe(void *sarg) {
char
subSqlstr
[
1024
];
char
subSqlstr
[
1024
];
TAOS_SUB
*
tsub
[
MAX_QUERY_SQL_COUNT
]
=
{
0
};
TAOS_SUB
*
tsub
[
MAX_QUERY_SQL_COUNT
]
=
{
0
};
if
(
g_queryInfo
.
superQueryInfo
.
sqlCount
==
0
)
return
NULL
;
if
(
pThreadInfo
->
taos
==
NULL
)
{
if
(
pThreadInfo
->
taos
==
NULL
)
{
TAOS
*
taos
=
NULL
;
TAOS
*
taos
=
NULL
;
taos
=
taos_connect
(
g_queryInfo
.
host
,
taos
=
taos_connect
(
g_queryInfo
.
host
,
...
@@ -6524,7 +6532,7 @@ static void *superSubscribe(void *sarg) {
...
@@ -6524,7 +6532,7 @@ static void *superSubscribe(void *sarg) {
TAOS_RES
*
res
=
NULL
;
TAOS_RES
*
res
=
NULL
;
while
(
1
)
{
while
(
1
)
{
for
(
int
i
=
0
;
i
<
g_queryInfo
.
superQueryInfo
.
sqlCount
;
i
++
)
{
for
(
int
i
=
0
;
i
<
g_queryInfo
.
superQueryInfo
.
sqlCount
;
i
++
)
{
if
(
1
==
g_queryInfo
.
superQueryInfo
.
mode
)
{
if
(
ASYNC_QUERY_MODE
==
g_queryInfo
.
superQueryInfo
.
mode
)
{
continue
;
continue
;
}
}
...
@@ -6554,6 +6562,9 @@ static void *specifiedSubscribe(void *sarg) {
...
@@ -6554,6 +6562,9 @@ static void *specifiedSubscribe(void *sarg) {
threadInfo
*
pThreadInfo
=
(
threadInfo
*
)
sarg
;
threadInfo
*
pThreadInfo
=
(
threadInfo
*
)
sarg
;
TAOS_SUB
*
tsub
[
MAX_QUERY_SQL_COUNT
]
=
{
0
};
TAOS_SUB
*
tsub
[
MAX_QUERY_SQL_COUNT
]
=
{
0
};
if
(
g_queryInfo
.
specifiedQueryInfo
.
sqlCount
==
0
)
return
NULL
;
if
(
pThreadInfo
->
taos
==
NULL
)
{
if
(
pThreadInfo
->
taos
==
NULL
)
{
TAOS
*
taos
=
NULL
;
TAOS
*
taos
=
NULL
;
taos
=
taos_connect
(
g_queryInfo
.
host
,
taos
=
taos_connect
(
g_queryInfo
.
host
,
...
@@ -6591,7 +6602,7 @@ static void *specifiedSubscribe(void *sarg) {
...
@@ -6591,7 +6602,7 @@ static void *specifiedSubscribe(void *sarg) {
for
(
int
i
=
0
;
i
<
g_queryInfo
.
specifiedQueryInfo
.
sqlCount
;
i
++
)
{
for
(
int
i
=
0
;
i
<
g_queryInfo
.
specifiedQueryInfo
.
sqlCount
;
i
++
)
{
sprintf
(
topic
,
"taosdemo-subscribe-%d"
,
i
);
sprintf
(
topic
,
"taosdemo-subscribe-%d"
,
i
);
char
tmpFile
[
MAX_FILE_NAME_LEN
*
2
]
=
{
0
};
char
tmpFile
[
MAX_FILE_NAME_LEN
*
2
]
=
{
0
};
if
(
g_queryInfo
.
s
uper
QueryInfo
.
result
[
i
][
0
]
!=
0
)
{
if
(
g_queryInfo
.
s
pecified
QueryInfo
.
result
[
i
][
0
]
!=
0
)
{
sprintf
(
tmpFile
,
"%s-%d"
,
sprintf
(
tmpFile
,
"%s-%d"
,
g_queryInfo
.
specifiedQueryInfo
.
result
[
i
],
pThreadInfo
->
threadID
);
g_queryInfo
.
specifiedQueryInfo
.
result
[
i
],
pThreadInfo
->
threadID
);
}
}
...
@@ -6610,7 +6621,7 @@ static void *specifiedSubscribe(void *sarg) {
...
@@ -6610,7 +6621,7 @@ static void *specifiedSubscribe(void *sarg) {
TAOS_RES
*
res
=
NULL
;
TAOS_RES
*
res
=
NULL
;
while
(
1
)
{
while
(
1
)
{
for
(
int
i
=
0
;
i
<
g_queryInfo
.
specifiedQueryInfo
.
sqlCount
;
i
++
)
{
for
(
int
i
=
0
;
i
<
g_queryInfo
.
specifiedQueryInfo
.
sqlCount
;
i
++
)
{
if
(
SYNC_QUERY_MODE
==
g_queryInfo
.
specifiedQueryInfo
.
mode
)
{
if
(
A
SYNC_QUERY_MODE
==
g_queryInfo
.
specifiedQueryInfo
.
mode
)
{
continue
;
continue
;
}
}
...
@@ -6710,21 +6721,21 @@ static int subscribeTestProcess() {
...
@@ -6710,21 +6721,21 @@ static int subscribeTestProcess() {
exit
(
-
1
);
exit
(
-
1
);
}
}
in
t
ntables
=
g_queryInfo
.
superQueryInfo
.
childTblCount
;
uint64_
t
ntables
=
g_queryInfo
.
superQueryInfo
.
childTblCount
;
int
threads
=
g_queryInfo
.
superQueryInfo
.
threadCnt
;
int
threads
=
g_queryInfo
.
superQueryInfo
.
threadCnt
;
in
t
a
=
ntables
/
threads
;
uint64_
t
a
=
ntables
/
threads
;
if
(
a
<
1
)
{
if
(
a
<
1
)
{
threads
=
ntables
;
threads
=
ntables
;
a
=
1
;
a
=
1
;
}
}
in
t
b
=
0
;
uint64_
t
b
=
0
;
if
(
threads
!=
0
)
{
if
(
threads
!=
0
)
{
b
=
ntables
%
threads
;
b
=
ntables
%
threads
;
}
}
in
t
startFrom
=
0
;
uint64_
t
startFrom
=
0
;
for
(
int
i
=
0
;
i
<
threads
;
i
++
)
{
for
(
int
i
=
0
;
i
<
threads
;
i
++
)
{
threadInfo
*
t_info
=
infosOfSub
+
i
;
threadInfo
*
t_info
=
infosOfSub
+
i
;
t_info
->
threadID
=
i
;
t_info
->
threadID
=
i
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录