Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
8f6b426a
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
8f6b426a
编写于
5月 19, 2022
作者:
P
plum-lihui
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
test: modify tmq_sim processer
上级
944a9c6e
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
33 addition
and
20 deletion
+33
-20
tests/test/c/tmqSim.c
tests/test/c/tmqSim.c
+33
-20
未找到文件。
tests/test/c/tmqSim.c
浏览文件 @
8f6b426a
...
...
@@ -98,16 +98,28 @@ static void printHelp() {
exit
(
EXIT_SUCCESS
);
}
char
*
getCurrentTimeString
(
char
*
timeString
)
{
time_t
tTime
=
taosGetTimestampSec
();
struct
tm
tm
=
*
taosLocalTime
(
&
tTime
,
NULL
);
sprintf
(
timeString
,
"%d-%02d-%02d %02d:%02d:%02d"
,
tm
.
tm_year
+
1900
,
tm
.
tm_mon
+
1
,
tm
.
tm_mday
,
tm
.
tm_hour
,
tm
.
tm_min
,
tm
.
tm_sec
);
return
timeString
;
}
void
initLogFile
()
{
time_t
now
;
struct
tm
curTime
;
char
filename
[
256
];
now
=
taosTime
(
NULL
);
taosLocalTime
(
&
now
,
&
curTime
);
sprintf
(
filename
,
"%s/../log/tmqlog_%04d-%02d-%02d %02d-%02d-%02d.txt"
,
configDir
,
curTime
.
tm_year
+
1900
,
curTime
.
tm_mon
+
1
,
curTime
.
tm_mday
,
curTime
.
tm_hour
,
curTime
.
tm_min
,
curTime
.
tm_sec
);
// sprintf(filename, "%s/../log/tmqlog.txt", configDir);
char
filename
[
256
];
char
tmpString
[
128
];
sprintf
(
filename
,
"%s/../log/tmqlog_%s.txt"
,
configDir
,
getCurrentTimeString
(
tmpString
));
//sprintf(filename, "%s/../log/tmqlog.txt", configDir);
TdFilePtr
pFile
=
taosOpenFile
(
filename
,
TD_FILE_TEXT
|
TD_FILE_WRITE
|
TD_FILE_TRUNC
|
TD_FILE_STREAM
);
if
(
NULL
==
pFile
)
{
fprintf
(
stderr
,
"Failed to open %s for save result
\n
"
,
filename
);
...
...
@@ -117,9 +129,6 @@ void initLogFile() {
}
void
saveConfigToLogFile
()
{
time_t
tTime
=
taosGetTimestampSec
();
struct
tm
tm
=
*
taosLocalTime
(
&
tTime
,
NULL
);
taosFprintfFile
(
g_fp
,
"###################################################################
\n
"
);
taosFprintfFile
(
g_fp
,
"# configDir: %s
\n
"
,
configDir
);
taosFprintfFile
(
g_fp
,
"# dbName: %s
\n
"
,
g_stConfInfo
.
dbName
);
...
...
@@ -144,10 +153,11 @@ void saveConfigToLogFile() {
taosFprintfFile
(
g_fp
,
"%s:%s, "
,
g_stConfInfo
.
stThreads
[
i
].
key
[
k
],
g_stConfInfo
.
stThreads
[
i
].
value
[
k
]);
}
taosFprintfFile
(
g_fp
,
"
\n
"
);
taosFprintfFile
(
g_fp
,
" expect rows: %d
\n
"
,
g_stConfInfo
.
stThreads
[
i
].
expectMsgCnt
);
}
taosFprintfFile
(
g_fp
,
"# Test time: %d-%02d-%02d %02d:%02d:%02d
\n
"
,
tm
.
tm_year
+
1900
,
tm
.
tm_mon
+
1
,
tm
.
tm_mday
,
tm
.
tm_hour
,
tm
.
tm_min
,
tm
.
tm_sec
);
char
tmpString
[
128
];
taosFprintfFile
(
g_fp
,
"# Test time: %s
\n
"
,
getCurrentTimeString
(
tmpString
)
);
taosFprintfFile
(
g_fp
,
"###################################################################
\n
"
);
}
...
...
@@ -316,10 +326,8 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) {
sprintf
(
sqlStr
,
"insert into %s.consumeresult values (now, %d, %"
PRId64
", %"
PRId64
", %d)"
,
g_stConfInfo
.
cdbName
,
pInfo
->
consumerId
,
pInfo
->
consumeMsgCnt
,
pInfo
->
consumeRowCnt
,
pInfo
->
checkresult
);
time_t
tTime
=
taosGetTimestampSec
();
struct
tm
tm
=
*
taosLocalTime
(
&
tTime
,
NULL
);
taosFprintfFile
(
g_fp
,
"# save result: %d-%02d-%02d %02d:%02d:%02d, sql: %s
\n
"
,
tm
.
tm_year
+
1900
,
tm
.
tm_mon
+
1
,
tm
.
tm_mday
,
tm
.
tm_hour
,
tm
.
tm_min
,
tm
.
tm_sec
,
sqlStr
);
char
tmpString
[
128
];
taosFprintfFile
(
g_fp
,
"%s, consume id %d result: %s
\n
"
,
getCurrentTimeString
(
tmpString
),
pInfo
->
consumerId
,
sqlStr
);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
sqlStr
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
...
...
@@ -339,6 +347,9 @@ void loop_consume(SThreadInfo* pInfo) {
int64_t
totalMsgs
=
0
;
int64_t
totalRows
=
0
;
char
tmpString
[
128
];
taosFprintfFile
(
g_fp
,
"%s consumer id %d start to loop pull msg
\n
"
,
getCurrentTimeString
(
tmpString
),
pInfo
->
consumerId
);
while
(
running
)
{
TAOS_RES
*
tmqMsg
=
tmq_consumer_poll
(
pInfo
->
tmq
,
g_stConfInfo
.
consumeDelay
*
1000
);
if
(
tmqMsg
)
{
...
...
@@ -351,11 +362,13 @@ void loop_consume(SThreadInfo* pInfo) {
totalMsgs
++
;
if
(
totalRows
>=
pInfo
->
expectMsgCnt
)
{
taosFprintfFile
(
g_fp
,
"==== totalRows >= pInfo->expectMsgCnt, so break
\n
"
);
char
tmpString
[
128
];
taosFprintfFile
(
g_fp
,
"%s over than expect rows, so break consume
\n
"
,
getCurrentTimeString
(
tmpString
));
break
;
}
}
else
{
taosFprintfFile
(
g_fp
,
"==== delay over time, so break
\n
"
);
char
tmpString
[
128
];
taosFprintfFile
(
g_fp
,
"%s no poll more msg when time over, break consume
\n
"
,
getCurrentTimeString
(
tmpString
));
break
;
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录