Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
weixin_54624550
TDengine
提交
62289a0b
T
TDengine
项目概览
weixin_54624550
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
4
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
62289a0b
编写于
5月 10, 2021
作者:
S
Shuduo Sang
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[TD-3992]<fix>: taosdemo subscribe.
refactor sync/async mode.
上级
05772f52
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
57 addition
and
71 deletion
+57
-71
src/kit/taosdemo/taosdemo.c
src/kit/taosdemo/taosdemo.c
+57
-71
未找到文件。
src/kit/taosdemo/taosdemo.c
浏览文件 @
62289a0b
...
...
@@ -68,12 +68,6 @@ enum TEST_MODE {
INVAID_TEST
};
enum
QUERY_MODE
{
SYNC_QUERY_MODE
,
// 0
ASYNC_QUERY_MODE
,
// 1
INVALID_MODE
};
#define MAX_SQL_SIZE 65536
#define BUFFER_SIZE (65536*2)
#define MAX_USERNAME_SIZE 64
...
...
@@ -118,8 +112,8 @@ typedef enum TALBE_EXISTS_EN {
}
TALBE_EXISTS_EN
;
enum
MODE
{
SYNC
,
ASYNC
,
SYNC
_MODE
,
ASYNC
_MODE
,
MODE_BUT
};
...
...
@@ -205,7 +199,7 @@ typedef struct SArguments_S {
bool
verbose_print
;
bool
performance_print
;
char
*
output_file
;
uint32_t
query
_mode
;
bool
async
_mode
;
char
*
datatype
[
MAX_NUM_DATATYPE
+
1
];
uint32_t
len_of_binary
;
uint32_t
num_of_CPR
;
...
...
@@ -343,7 +337,7 @@ typedef struct SDbs_S {
bool
use_metric
;
bool
insert_only
;
bool
do_aggreFunc
;
bool
query
Mode
;
bool
async
Mode
;
uint32_t
threadCount
;
uint32_t
threadCountByCreateTbl
;
...
...
@@ -360,7 +354,7 @@ typedef struct SpecifiedQueryInfo_S {
uint64_t
queryInterval
;
// 0: unlimit > 0 loop/s
uint64_t
concurrent
;
uint64_t
sqlCount
;
uint32_t
m
ode
;
// 0: sync, 1: async
uint32_t
asyncM
ode
;
// 0: sync, 1: async
uint64_t
subscribeInterval
;
// ms
uint64_t
queryTimes
;
int
subscribeRestart
;
...
...
@@ -375,7 +369,7 @@ typedef struct SuperQueryInfo_S {
char
sTblName
[
MAX_TB_NAME_SIZE
+
1
];
uint64_t
queryInterval
;
// 0: unlimit > 0 loop/s
uint32_t
threadCnt
;
uint32_t
m
ode
;
// 0: sync, 1: async
uint32_t
asyncM
ode
;
// 0: sync, 1: async
uint64_t
subscribeInterval
;
// ms
int
subscribeRestart
;
int
subscribeKeepProgress
;
...
...
@@ -771,49 +765,48 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
}
arguments
->
sqlFile
=
argv
[
++
i
];
}
else
if
(
strcmp
(
argv
[
i
],
"-q"
)
==
0
)
{
if
((
argc
==
i
+
1
)
||
(
!
isStringNumber
(
argv
[
i
+
1
])))
{
if
((
argc
==
i
+
1
)
||
(
!
isStringNumber
(
argv
[
i
+
1
])))
{
printHelp
();
errorPrint
(
"%s"
,
"
\n\t
-q need a number following!
\n
Query mode -- 0: SYNC, 1: ASYNC. Default is SYNC.
\n
"
);
exit
(
EXIT_FAILURE
);
}
arguments
->
query
_mode
=
atoi
(
argv
[
++
i
]);
arguments
->
async
_mode
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-T"
)
==
0
)
{
if
((
argc
==
i
+
1
)
||
(
!
isStringNumber
(
argv
[
i
+
1
])))
{
if
((
argc
==
i
+
1
)
||
(
!
isStringNumber
(
argv
[
i
+
1
])))
{
printHelp
();
errorPrint
(
"%s"
,
"
\n\t
-T need a number following!
\n
"
);
exit
(
EXIT_FAILURE
);
}
arguments
->
num_of_threads
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-i"
)
==
0
)
{
if
((
argc
==
i
+
1
)
||
(
!
isStringNumber
(
argv
[
i
+
1
])))
{
if
((
argc
==
i
+
1
)
||
(
!
isStringNumber
(
argv
[
i
+
1
])))
{
printHelp
();
errorPrint
(
"%s"
,
"
\n\t
-i need a number following!
\n
"
);
exit
(
EXIT_FAILURE
);
}
arguments
->
insert_interval
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-qt"
)
==
0
)
{
if
((
argc
==
i
+
1
)
||
(
!
isStringNumber
(
argv
[
i
+
1
]))
||
(
atoi
(
argv
[
i
+
1
])
<=
0
))
{
if
((
argc
==
i
+
1
)
||
(
!
isStringNumber
(
argv
[
i
+
1
])))
{
printHelp
();
errorPrint
(
"%s"
,
"
\n\t
-qt need a
valid (>0)
number following!
\n
"
);
errorPrint
(
"%s"
,
"
\n\t
-qt need a number following!
\n
"
);
exit
(
EXIT_FAILURE
);
}
arguments
->
query_times
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-B"
)
==
0
)
{
if
((
argc
==
i
+
1
)
||
(
!
isStringNumber
(
argv
[
i
+
1
])))
{
if
((
argc
==
i
+
1
)
||
(
!
isStringNumber
(
argv
[
i
+
1
])))
{
printHelp
();
errorPrint
(
"%s"
,
"
\n\t
-B need a number following!
\n
"
);
exit
(
EXIT_FAILURE
);
}
arguments
->
interlace_rows
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-r"
)
==
0
)
{
if
((
argc
==
i
+
1
)
||
(
!
isStringNumber
(
argv
[
i
+
1
])))
{
if
((
argc
==
i
+
1
)
||
(
!
isStringNumber
(
argv
[
i
+
1
])))
{
printHelp
();
errorPrint
(
"%s"
,
"
\n\t
-r need a number following!
\n
"
);
exit
(
EXIT_FAILURE
);
...
...
@@ -1073,7 +1066,7 @@ static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet) {
if
(
code
!=
0
)
{
if
(
!
quiet
)
{
debugPrint
(
"%s() LN%d - command: %s
\n
"
,
__func__
,
__LINE__
,
command
);
errorPrint
(
"Failed to
execute
%s, reason: %s
\n
"
,
command
,
taos_errstr
(
res
));
errorPrint
(
"Failed to
run
%s, reason: %s
\n
"
,
command
,
taos_errstr
(
res
));
}
taos_free_result
(
res
);
//taos_close(taos);
...
...
@@ -1646,7 +1639,7 @@ static void printfQueryMeta() {
printf
(
"concurrent:
\033
[33m%"
PRIu64
"
\033
[0m
\n
"
,
g_queryInfo
.
specifiedQueryInfo
.
concurrent
);
printf
(
"mod:
\033
[33m%s
\033
[0m
\n
"
,
(
g_queryInfo
.
specifiedQueryInfo
.
m
ode
)
?
"async"
:
"sync"
);
(
g_queryInfo
.
specifiedQueryInfo
.
asyncM
ode
)
?
"async"
:
"sync"
);
printf
(
"interval:
\033
[33m%"
PRIu64
"
\033
[0m
\n
"
,
g_queryInfo
.
specifiedQueryInfo
.
subscribeInterval
);
printf
(
"restart:
\033
[33m%d
\033
[0m
\n
"
,
...
...
@@ -1678,7 +1671,7 @@ static void printfQueryMeta() {
g_queryInfo
.
superQueryInfo
.
queryTimes
);
printf
(
"mod:
\033
[33m%s
\033
[0m
\n
"
,
(
g_queryInfo
.
superQueryInfo
.
m
ode
)
?
"async"
:
"sync"
);
(
g_queryInfo
.
superQueryInfo
.
asyncM
ode
)
?
"async"
:
"sync"
);
printf
(
"interval:
\033
[33m%"
PRIu64
"
\033
[0m
\n
"
,
g_queryInfo
.
superQueryInfo
.
subscribeInterval
);
printf
(
"restart:
\033
[33m%d
\033
[0m
\n
"
,
...
...
@@ -4041,9 +4034,9 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
cJSON
*
gQueryTimes
=
cJSON_GetObjectItem
(
root
,
"query_times"
);
if
(
gQueryTimes
&&
gQueryTimes
->
type
==
cJSON_Number
)
{
if
(
gQueryTimes
->
valueint
<
=
0
)
{
errorPrint
(
"%s() LN%d, failed to read json, query_times
: %"
PRId64
", need be a valid (>0) number
\n
"
,
__func__
,
__LINE__
,
gQueryTimes
->
valueint
);
if
(
gQueryTimes
->
valueint
<
0
)
{
errorPrint
(
"%s() LN%d, failed to read json, query_times
input mistake
\n
"
,
__func__
,
__LINE__
);
goto
PARSE_OVER
;
}
g_args
.
query_times
=
gQueryTimes
->
valueint
;
...
...
@@ -4092,9 +4085,9 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
cJSON
*
specifiedQueryTimes
=
cJSON_GetObjectItem
(
specifiedQuery
,
"query_times"
);
if
(
specifiedQueryTimes
&&
specifiedQueryTimes
->
type
==
cJSON_Number
)
{
if
(
specifiedQueryTimes
->
valueint
<
=
0
)
{
errorPrint
(
"%s() LN%d, failed to read json, query_times
: %"
PRId64
", need be a valid (>0) number
\n
"
,
__func__
,
__LINE__
,
specifiedQueryTimes
->
valueint
);
if
(
specifiedQueryTimes
->
valueint
<
0
)
{
errorPrint
(
"%s() LN%d, failed to read json, query_times
input mistake
\n
"
,
__func__
,
__LINE__
);
goto
PARSE_OVER
;
}
...
...
@@ -4121,20 +4114,20 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
g_queryInfo
.
specifiedQueryInfo
.
concurrent
=
1
;
}
cJSON
*
m
ode
=
cJSON_GetObjectItem
(
specifiedQuery
,
"mode"
);
if
(
mode
&&
m
ode
->
type
==
cJSON_String
&&
m
ode
->
valuestring
!=
NULL
)
{
if
(
0
==
strcmp
(
"sync"
,
m
ode
->
valuestring
))
{
g_queryInfo
.
specifiedQueryInfo
.
mode
=
SYNC_QUERY
_MODE
;
}
else
if
(
0
==
strcmp
(
"async"
,
m
ode
->
valuestring
))
{
g_queryInfo
.
specifiedQueryInfo
.
mode
=
ASYNC_QUERY
_MODE
;
cJSON
*
specifiedAsyncM
ode
=
cJSON_GetObjectItem
(
specifiedQuery
,
"mode"
);
if
(
specifiedAsyncMode
&&
specifiedAsyncM
ode
->
type
==
cJSON_String
&&
specifiedAsyncM
ode
->
valuestring
!=
NULL
)
{
if
(
0
==
strcmp
(
"sync"
,
specifiedAsyncM
ode
->
valuestring
))
{
g_queryInfo
.
specifiedQueryInfo
.
asyncMode
=
SYNC
_MODE
;
}
else
if
(
0
==
strcmp
(
"async"
,
specifiedAsyncM
ode
->
valuestring
))
{
g_queryInfo
.
specifiedQueryInfo
.
asyncMode
=
ASYNC
_MODE
;
}
else
{
errorPrint
(
"%s() LN%d, failed to read json,
query
mode input error
\n
"
,
errorPrint
(
"%s() LN%d, failed to read json,
async
mode input error
\n
"
,
__func__
,
__LINE__
);
goto
PARSE_OVER
;
}
}
else
{
g_queryInfo
.
specifiedQueryInfo
.
mode
=
SYNC_QUERY
_MODE
;
g_queryInfo
.
specifiedQueryInfo
.
asyncMode
=
SYNC
_MODE
;
}
cJSON
*
interval
=
cJSON_GetObjectItem
(
specifiedQuery
,
"interval"
);
...
...
@@ -4236,9 +4229,9 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
cJSON
*
superQueryTimes
=
cJSON_GetObjectItem
(
superQuery
,
"query_times"
);
if
(
superQueryTimes
&&
superQueryTimes
->
type
==
cJSON_Number
)
{
if
(
superQueryTimes
->
valueint
<
=
0
)
{
errorPrint
(
"%s() LN%d, failed to read json, query_times
: %"
PRId64
", need be a valid (>0) number
\n
"
,
__func__
,
__LINE__
,
superQueryTimes
->
valueint
);
if
(
superQueryTimes
->
valueint
<
0
)
{
errorPrint
(
"%s() LN%d, failed to read json, query_times
input mistake
\n
"
,
__func__
,
__LINE__
);
goto
PARSE_OVER
;
}
g_queryInfo
.
superQueryInfo
.
queryTimes
=
superQueryTimes
->
valueint
;
...
...
@@ -4281,20 +4274,20 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
goto
PARSE_OVER
;
}
cJSON
*
su
bm
ode
=
cJSON_GetObjectItem
(
superQuery
,
"mode"
);
if
(
su
bmode
&&
subm
ode
->
type
==
cJSON_String
&&
su
bm
ode
->
valuestring
!=
NULL
)
{
if
(
0
==
strcmp
(
"sync"
,
su
bm
ode
->
valuestring
))
{
g_queryInfo
.
superQueryInfo
.
mode
=
SYNC_QUERY
_MODE
;
}
else
if
(
0
==
strcmp
(
"async"
,
su
bm
ode
->
valuestring
))
{
g_queryInfo
.
superQueryInfo
.
mode
=
ASYNC_QUERY
_MODE
;
cJSON
*
su
perAsyncM
ode
=
cJSON_GetObjectItem
(
superQuery
,
"mode"
);
if
(
su
perAsyncMode
&&
superAsyncM
ode
->
type
==
cJSON_String
&&
su
perAsyncM
ode
->
valuestring
!=
NULL
)
{
if
(
0
==
strcmp
(
"sync"
,
su
perAsyncM
ode
->
valuestring
))
{
g_queryInfo
.
superQueryInfo
.
asyncMode
=
SYNC
_MODE
;
}
else
if
(
0
==
strcmp
(
"async"
,
su
perAsyncM
ode
->
valuestring
))
{
g_queryInfo
.
superQueryInfo
.
asyncMode
=
ASYNC
_MODE
;
}
else
{
errorPrint
(
"%s() LN%d, failed to read json,
query
mode input error
\n
"
,
errorPrint
(
"%s() LN%d, failed to read json,
async
mode input error
\n
"
,
__func__
,
__LINE__
);
goto
PARSE_OVER
;
}
}
else
{
g_queryInfo
.
superQueryInfo
.
mode
=
SYNC_QUERY
_MODE
;
g_queryInfo
.
superQueryInfo
.
asyncMode
=
SYNC
_MODE
;
}
cJSON
*
superInterval
=
cJSON_GetObjectItem
(
superQuery
,
"interval"
);
...
...
@@ -5201,13 +5194,6 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
startTs
=
taosGetTimestampMs
();
if
(
recOfBatch
==
0
)
{
errorPrint
(
"[%d] %s() LN%d try inserting records of batch is %"
PRIu64
"
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
recOfBatch
);
errorPrint
(
"%s
\n
"
,
"
\t
Please check if the batch or the buffer length is proper value!
\n
"
);
goto
free_of_interlace
;
}
int64_t
affectedRows
=
execInsert
(
pThreadInfo
,
buffer
,
recOfBatch
);
endTs
=
taosGetTimestampMs
();
...
...
@@ -5748,10 +5734,10 @@ static void startMultiThreadInsertData(int threads, char* db_name,
}
*/
tsem_init
(
&
(
t_info
->
lock_sem
),
0
,
0
);
if
(
SYNC
==
g_Dbs
.
queryMode
)
{
pthread_create
(
pids
+
i
,
NULL
,
syncWrite
,
t_info
);
}
else
{
if
(
ASYNC_MODE
==
g_Dbs
.
asyncMode
)
{
pthread_create
(
pids
+
i
,
NULL
,
asyncWrite
,
t_info
);
}
else
{
pthread_create
(
pids
+
i
,
NULL
,
syncWrite
,
t_info
);
}
}
...
...
@@ -6455,7 +6441,7 @@ static TAOS_SUB* subscribeImpl(
TAOS
*
taos
,
char
*
sql
,
char
*
topic
,
char
*
resultFileName
)
{
TAOS_SUB
*
tsub
=
NULL
;
if
(
ASYNC_
QUERY_MODE
==
g_queryInfo
.
specifiedQueryInfo
.
m
ode
)
{
if
(
ASYNC_
MODE
==
g_queryInfo
.
specifiedQueryInfo
.
asyncM
ode
)
{
tsub
=
taos_subscribe
(
taos
,
g_queryInfo
.
specifiedQueryInfo
.
subscribeRestart
,
topic
,
sql
,
subscribe_callback
,
(
void
*
)
resultFileName
,
...
...
@@ -6540,7 +6526,7 @@ static void *superSubscribe(void *sarg) {
TAOS_RES
*
res
=
NULL
;
while
(
1
)
{
for
(
int
i
=
0
;
i
<
g_queryInfo
.
superQueryInfo
.
sqlCount
;
i
++
)
{
if
(
ASYNC_
QUERY_MODE
==
g_queryInfo
.
superQueryInfo
.
m
ode
)
{
if
(
ASYNC_
MODE
==
g_queryInfo
.
superQueryInfo
.
asyncM
ode
)
{
continue
;
}
...
...
@@ -6629,7 +6615,7 @@ static void *specifiedSubscribe(void *sarg) {
TAOS_RES
*
res
=
NULL
;
while
(
1
)
{
for
(
int
i
=
0
;
i
<
g_queryInfo
.
specifiedQueryInfo
.
sqlCount
;
i
++
)
{
if
(
ASYNC_
QUERY_MODE
==
g_queryInfo
.
specifiedQueryInfo
.
m
ode
)
{
if
(
ASYNC_
MODE
==
g_queryInfo
.
specifiedQueryInfo
.
asyncM
ode
)
{
continue
;
}
...
...
@@ -6852,7 +6838,7 @@ static void setParaFromArg(){
g_Dbs
.
db
[
0
].
superTbls
[
0
].
childTblCount
=
g_args
.
num_of_tables
;
g_Dbs
.
threadCount
=
g_args
.
num_of_threads
;
g_Dbs
.
threadCountByCreateTbl
=
g_args
.
num_of_threads
;
g_Dbs
.
queryMode
=
g_args
.
query
_mode
;
g_Dbs
.
asyncMode
=
g_args
.
async
_mode
;
g_Dbs
.
db
[
0
].
superTbls
[
0
].
autoCreateTable
=
PRE_CREATE_SUBTBL
;
g_Dbs
.
db
[
0
].
superTbls
[
0
].
childTblExists
=
TBL_NO_EXISTS
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录