Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
794ea5c5
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
794ea5c5
编写于
7月 18, 2022
作者:
H
Hui Li
提交者:
GitHub
7月 18, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #15006 from taosdata/test3.0/lihui
test: extent tmqSim
上级
66d3c489
b3fe36a0
变更
1
显示空白变更内容
内联
并排
Showing
1 changed file
with
521 addition
and
4 deletion
+521
-4
tests/test/c/tmqSim.c
tests/test/c/tmqSim.c
+521
-4
未找到文件。
tests/test/c/tmqSim.c
浏览文件 @
794ea5c5
...
...
@@ -20,6 +20,7 @@
#include <sys/stat.h>
#include <sys/types.h>
#include <time.h>
#include <math.h>
#include "taos.h"
#include "taosdef.h"
...
...
@@ -35,6 +36,8 @@
#define MAX_ROW_STR_LEN (16 * 1024)
#define MAX_CONSUMER_THREAD_CNT (16)
#define MAX_VGROUP_CNT (32)
#define SEND_TIME_UNIT 10 // ms
#define MAX_SQL_LEN 1048576
typedef
enum
{
NOTIFY_CMD_START_CONSUM
,
...
...
@@ -42,6 +45,12 @@ typedef enum {
NOTIFY_CMD_ID_BUTT
,
}
NOTIFY_CMD_ID
;
typedef
enum
enumQUERY_TYPE
{
NO_INSERT_TYPE
,
INSERT_TYPE
,
QUERY_TYPE_BUT
}
QUERY_TYPE
;
typedef
struct
{
TdThread
thread
;
int32_t
consumerId
;
...
...
@@ -58,6 +67,7 @@ typedef struct {
int64_t
consumeMsgCnt
;
int64_t
consumeRowCnt
;
int64_t
consumeLen
;
int32_t
checkresult
;
char
topicString
[
1024
];
...
...
@@ -79,12 +89,17 @@ typedef struct {
TAOS
*
taos
;
// below parameters is used by omb test
int32_t
producerRate
;
// unit: msgs/s
int64_t
totalProduceMsgs
;
int64_t
totalMsgsLen
;
}
SThreadInfo
;
typedef
struct
{
// input from argvs
char
cdbName
[
32
];
char
dbName
[
32
];
char
dbName
[
64
];
int32_t
showMsgFlag
;
int32_t
showRowFlag
;
int32_t
saveRowFlag
;
...
...
@@ -93,11 +108,22 @@ typedef struct {
int32_t
useSnapshot
;
int64_t
nowTime
;
SThreadInfo
stThreads
[
MAX_CONSUMER_THREAD_CNT
];
SThreadInfo
stProdThreads
[
MAX_CONSUMER_THREAD_CNT
];
// below parameters is used by omb test
char
topic
[
64
];
int32_t
producers
;
int32_t
producerRate
;
int32_t
runDurationMinutes
;
int32_t
batchSize
;
int32_t
payloadLen
;
}
SConfInfo
;
static
SConfInfo
g_stConfInfo
;
TdFilePtr
g_fp
=
NULL
;
static
int
running
=
1
;
char
*
g_payload
=
NULL
;
// char* g_pRowValue = NULL;
// TdFilePtr g_fp = NULL;
...
...
@@ -117,7 +143,29 @@ static void printHelp() {
printf
(
"%s%s
\n
"
,
indent
,
"-s"
);
printf
(
"%s%s%s%d
\n
"
,
indent
,
indent
,
"saveRowFlag, default is "
,
g_stConfInfo
.
saveRowFlag
);
printf
(
"%s%s
\n
"
,
indent
,
"-y"
);
printf
(
"%s%s%s%d
\n
"
,
indent
,
indent
,
"consume delay, default is s"
,
g_stConfInfo
.
consumeDelay
);
printf
(
"%s%s%s%ds
\n
"
,
indent
,
indent
,
"consume delay, default is "
,
g_stConfInfo
.
consumeDelay
);
printf
(
"%s%s
\n
"
,
indent
,
"-e"
);
printf
(
"%s%s%s%d
\n
"
,
indent
,
indent
,
"snapshot, default is "
,
g_stConfInfo
.
useSnapshot
);
printf
(
"%s%s
\n
"
,
indent
,
"-t"
);
printf
(
"%s%s%s
\n
"
,
indent
,
indent
,
"topic name, default is null"
);
printf
(
"%s%s
\n
"
,
indent
,
"-x"
);
printf
(
"%s%s%s
\n
"
,
indent
,
indent
,
"consume thread number, default is 1"
);
printf
(
"%s%s
\n
"
,
indent
,
"-l"
);
printf
(
"%s%s%s
\n
"
,
indent
,
indent
,
"run duration unit is minutes, default is "
,
g_stConfInfo
.
runDurationMinutes
);
printf
(
"%s%s
\n
"
,
indent
,
"-p"
);
printf
(
"%s%s%s
\n
"
,
indent
,
indent
,
"producer thread number, default is 0"
);
printf
(
"%s%s
\n
"
,
indent
,
"-b"
);
printf
(
"%s%s%s
\n
"
,
indent
,
indent
,
"batch size, default is 1"
);
printf
(
"%s%s
\n
"
,
indent
,
"-i"
);
printf
(
"%s%s%s
\n
"
,
indent
,
indent
,
"produce rate unit is msgs /s, default is 100000"
);
printf
(
"%s%s
\n
"
,
indent
,
"-n"
);
printf
(
"%s%s%s
\n
"
,
indent
,
indent
,
"payload len unit is byte, default is 1000"
);
exit
(
EXIT_SUCCESS
);
}
...
...
@@ -144,7 +192,11 @@ void initLogFile() {
pid_t
process_id
=
getpid
();
if
(
0
!=
strlen
(
g_stConfInfo
.
topic
))
{
sprintf
(
filename
,
"/tmp/tmqlog-%d-%s.txt"
,
process_id
,
getCurrentTimeString
(
tmpString
));
}
else
{
sprintf
(
filename
,
"%s/../log/tmqlog-%d-%s.txt"
,
configDir
,
process_id
,
getCurrentTimeString
(
tmpString
));
}
#ifdef WINDOWS
for
(
int
i
=
2
;
i
<
sizeof
(
filename
);
i
++
)
{
if
(
filename
[
i
]
==
':'
)
filename
[
i
]
=
'-'
;
...
...
@@ -199,6 +251,9 @@ void parseArgument(int32_t argc, char* argv[]) {
g_stConfInfo
.
showRowFlag
=
0
;
g_stConfInfo
.
saveRowFlag
=
0
;
g_stConfInfo
.
consumeDelay
=
5
;
g_stConfInfo
.
numOfThread
=
1
;
g_stConfInfo
.
batchSize
=
1
;
g_stConfInfo
.
producers
=
0
;
g_stConfInfo
.
nowTime
=
taosGetTimestampMs
();
...
...
@@ -222,12 +277,38 @@ void parseArgument(int32_t argc, char* argv[]) {
g_stConfInfo
.
consumeDelay
=
atol
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-e"
)
==
0
)
{
g_stConfInfo
.
useSnapshot
=
atol
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-t"
)
==
0
)
{
char
tmpBuf
[
56
];
strcpy
(
tmpBuf
,
argv
[
++
i
]);
sprintf
(
g_stConfInfo
.
topic
,
"`%s`"
,
tmpBuf
);
}
else
if
(
strcmp
(
argv
[
i
],
"-x"
)
==
0
)
{
g_stConfInfo
.
numOfThread
=
atol
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-l"
)
==
0
)
{
g_stConfInfo
.
runDurationMinutes
=
atol
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-p"
)
==
0
)
{
g_stConfInfo
.
producers
=
atol
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-b"
)
==
0
)
{
g_stConfInfo
.
batchSize
=
atol
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-i"
)
==
0
)
{
g_stConfInfo
.
producerRate
=
atol
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-n"
)
==
0
)
{
g_stConfInfo
.
payloadLen
=
atol
(
argv
[
++
i
]);
}
else
{
pError
(
"%s unknow para: %s %s"
,
GREEN
,
argv
[
++
i
],
NC
);
exit
(
-
1
);
}
}
g_payload
=
taosMemoryCalloc
(
g_stConfInfo
.
payloadLen
+
1
,
1
);
if
(
NULL
==
g_payload
)
{
pPrint
(
"%s failed to malloc for payload %s"
,
GREEN
,
NC
);
exit
(
-
1
);
}
for
(
int32_t
i
=
0
;
i
<
g_stConfInfo
.
payloadLen
;
i
++
)
{
strcpy
(
&
g_payload
[
i
],
"a"
);
}
initLogFile
();
taosFprintfFile
(
g_fp
,
"====parseArgument() success
\n
"
);
...
...
@@ -240,6 +321,11 @@ void parseArgument(int32_t argc, char* argv[]) {
pPrint
(
"%s showMsgFlag:%d %s"
,
GREEN
,
g_stConfInfo
.
showMsgFlag
,
NC
);
pPrint
(
"%s showRowFlag:%d %s"
,
GREEN
,
g_stConfInfo
.
showRowFlag
,
NC
);
pPrint
(
"%s saveRowFlag:%d %s"
,
GREEN
,
g_stConfInfo
.
saveRowFlag
,
NC
);
pPrint
(
"%s snapshot:%d %s"
,
GREEN
,
g_stConfInfo
.
useSnapshot
,
NC
);
pPrint
(
"%s omb topic:%s %s"
,
GREEN
,
g_stConfInfo
.
topic
,
NC
);
pPrint
(
"%s numOfThread:%d %s"
,
GREEN
,
g_stConfInfo
.
numOfThread
,
NC
);
#endif
}
...
...
@@ -909,8 +995,439 @@ int32_t getConsumeInfo() {
return
0
;
}
static
int32_t
omb_data_msg_process
(
TAOS_RES
*
msg
,
SThreadInfo
*
pInfo
,
int32_t
msgIndex
,
int64_t
*
lenOfRows
)
{
char
buf
[
16
*
1024
];
int32_t
totalRows
=
0
;
int32_t
totalLen
=
0
;
// printf("topic: %s\n", tmq_get_topic_name(msg));
//int32_t vgroupId = tmq_get_vgroup_id(msg);
//const char* dbName = tmq_get_db_name(msg);
//taosFprintfFile(g_fp, "consumerId: %d, msg index:%" PRId64 "\n", pInfo->consumerId, msgIndex);
//taosFprintfFile(g_fp, "dbName: %s, topic: %s, vgroupId: %d\n", dbName != NULL ? dbName : "invalid table",
// tmq_get_topic_name(msg), vgroupId);
while
(
1
)
{
TAOS_ROW
row
=
taos_fetch_row
(
msg
);
if
(
row
==
NULL
)
break
;
TAOS_FIELD
*
fields
=
taos_fetch_fields
(
msg
);
int32_t
numOfFields
=
taos_field_count
(
msg
);
//int32_t* length = taos_fetch_lengths(msg);
//int32_t precision = taos_result_precision(msg);
//const char* tbName = tmq_get_table_name(msg);
taos_print_row
(
buf
,
row
,
fields
,
numOfFields
);
totalLen
+=
strlen
(
buf
);
totalRows
++
;
}
*
lenOfRows
=
totalLen
;
return
totalRows
;
}
void
omb_loop_consume
(
SThreadInfo
*
pInfo
)
{
int32_t
code
;
int32_t
once_flag
=
0
;
int64_t
totalMsgs
=
0
;
int64_t
totalRows
=
0
;
char
tmpString
[
128
];
taosFprintfFile
(
g_fp
,
"%s consumer id %d start to loop pull msg
\n
"
,
getCurrentTimeString
(
tmpString
),
pInfo
->
consumerId
);
printf
(
"%s consumer id %d start to loop pull msg
\n
"
,
getCurrentTimeString
(
tmpString
),
pInfo
->
consumerId
);
pInfo
->
ts
=
taosGetTimestampMs
();
int64_t
lastTotalMsgs
=
0
;
uint64_t
lastPrintTime
=
taosGetTimestampMs
();
uint64_t
startTs
=
taosGetTimestampMs
();
int64_t
totalLenOfMsg
=
0
;
int64_t
lastTotalLenOfMsg
=
0
;
int32_t
consumeDelay
=
g_stConfInfo
.
consumeDelay
==
-
1
?
-
1
:
(
g_stConfInfo
.
consumeDelay
*
1000
);
while
(
running
)
{
TAOS_RES
*
tmqMsg
=
tmq_consumer_poll
(
pInfo
->
tmq
,
consumeDelay
);
if
(
tmqMsg
)
{
int64_t
lenOfMsg
=
0
;
totalRows
+=
omb_data_msg_process
(
tmqMsg
,
pInfo
,
totalMsgs
,
&
lenOfMsg
);
totalLenOfMsg
+=
lenOfMsg
;
taos_free_result
(
tmqMsg
);
totalMsgs
++
;
int64_t
currentPrintTime
=
taosGetTimestampMs
();
if
(
currentPrintTime
-
lastPrintTime
>
10
*
1000
)
{
int64_t
currentLenOfMsg
=
totalLenOfMsg
-
lastTotalLenOfMsg
;
int64_t
deltaTime
=
currentPrintTime
-
lastPrintTime
;
printf
(
"consumer id %d has currently cons total rows: %"
PRId64
", msgs: %"
PRId64
", rate: %.3f msgs/s, %.1f MB/s
\n
"
,
pInfo
->
consumerId
,
totalRows
,
totalMsgs
,
(
totalMsgs
-
lastTotalMsgs
)
*
1000
.
0
/
deltaTime
,
currentLenOfMsg
*
1000
.
0
/
(
1024
*
1024
)
/
deltaTime
);
taosFprintfFile
(
g_fp
,
"consumer id %d has currently poll total msgs: %"
PRId64
", period cons rate: %.3f msgs/s, %.1f MB/s
\n
"
,
pInfo
->
consumerId
,
totalMsgs
,
(
totalMsgs
-
lastTotalMsgs
)
*
1000
.
0
/
deltaTime
,
currentLenOfMsg
*
1000
.
0
/
deltaTime
);
lastPrintTime
=
currentPrintTime
;
lastTotalMsgs
=
totalMsgs
;
lastTotalLenOfMsg
=
totalLenOfMsg
;
}
}
else
{
char
tmpString
[
128
];
taosFprintfFile
(
g_fp
,
"%s no poll more msg when time over, break consume
\n
"
,
getCurrentTimeString
(
tmpString
));
printf
(
"%s no poll more msg when time over, break consume
\n
"
,
getCurrentTimeString
(
tmpString
));
int64_t
currentPrintTime
=
taosGetTimestampMs
();
int64_t
currentLenOfMsg
=
totalLenOfMsg
-
lastTotalLenOfMsg
;
int64_t
deltaTime
=
currentPrintTime
-
lastPrintTime
;
printf
(
"consumer id %d has currently cons total rows: %"
PRId64
", msgs: %"
PRId64
", rate: %.3f msgs/s, %.1f MB/s
\n
"
,
pInfo
->
consumerId
,
totalRows
,
totalMsgs
,
(
totalMsgs
-
lastTotalMsgs
)
*
1000
.
0
/
deltaTime
,
currentLenOfMsg
*
1000
.
0
/
(
1024
*
1024
)
/
deltaTime
);
break
;
}
}
pInfo
->
consumeMsgCnt
=
totalMsgs
;
pInfo
->
consumeRowCnt
=
totalRows
;
pInfo
->
consumeLen
=
totalLenOfMsg
;
}
void
*
ombConsumeThreadFunc
(
void
*
param
)
{
SThreadInfo
*
pInfo
=
(
SThreadInfo
*
)
param
;
//################### set key ########################
tmq_conf_t
*
conf
=
tmq_conf_new
();
// tmq_conf_set(conf, "td.connect.ip", "localhost");
// tmq_conf_set(conf, "td.connect.port", "6030");
tmq_conf_set
(
conf
,
"td.connect.user"
,
"root"
);
tmq_conf_set
(
conf
,
"td.connect.pass"
,
"taosdata"
);
// tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);
tmq_conf_set_auto_commit_cb
(
conf
,
tmq_commit_cb_print
,
pInfo
);
tmq_conf_set
(
conf
,
"group.id"
,
"ombCgrp"
);
// tmq_conf_set(conf, "msg.with.table.name", "true");
// tmq_conf_set(conf, "client.id", "c-001");
// tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set
(
conf
,
"enable.auto.commit"
,
"false"
);
// tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
// tmq_conf_set(conf, "auto.offset.reset", "none");
// tmq_conf_set(conf, "auto.offset.reset", "earliest");
tmq_conf_set
(
conf
,
"auto.offset.reset"
,
"earliest"
);
//
if
(
g_stConfInfo
.
useSnapshot
)
{
tmq_conf_set
(
conf
,
"experimental.snapshot.enable"
,
"true"
);
}
pInfo
->
tmq
=
tmq_consumer_new
(
conf
,
NULL
,
0
);
tmq_conf_destroy
(
conf
);
//################### set topic ##########################
pInfo
->
topicList
=
tmq_list_new
();
tmq_list_append
(
pInfo
->
topicList
,
g_stConfInfo
.
topic
);
if
((
NULL
==
pInfo
->
tmq
)
||
(
NULL
==
pInfo
->
topicList
))
{
taosFprintfFile
(
g_fp
,
"create consumer fail! tmq is null or topicList is null
\n
"
);
assert
(
0
);
return
NULL
;
}
int32_t
err
=
tmq_subscribe
(
pInfo
->
tmq
,
pInfo
->
topicList
);
if
(
err
!=
0
)
{
pError
(
"tmq_subscribe() fail, reason: %s
\n
"
,
tmq_err2str
(
err
));
taosFprintfFile
(
g_fp
,
"tmq_subscribe() fail! reason: %s
\n
"
,
tmq_err2str
(
err
));
assert
(
0
);
return
NULL
;
}
tmq_list_destroy
(
pInfo
->
topicList
);
pInfo
->
topicList
=
NULL
;
omb_loop_consume
(
pInfo
);
err
=
tmq_unsubscribe
(
pInfo
->
tmq
);
if
(
err
!=
0
)
{
pError
(
"tmq_unsubscribe() fail, reason: %s
\n
"
,
tmq_err2str
(
err
));
taosFprintfFile
(
g_fp
,
"tmq_unsubscribe()! reason: %s
\n
"
,
tmq_err2str
(
err
));
}
err
=
tmq_consumer_close
(
pInfo
->
tmq
);
if
(
err
!=
0
)
{
pError
(
"tmq_consumer_close() fail, reason: %s
\n
"
,
tmq_err2str
(
err
));
taosFprintfFile
(
g_fp
,
"tmq_consumer_close()! reason: %s
\n
"
,
tmq_err2str
(
err
));
}
pInfo
->
tmq
=
NULL
;
return
NULL
;
}
static
int
queryDbExec
(
TAOS
*
taos
,
char
*
command
,
QUERY_TYPE
type
)
{
TAOS_RES
*
res
=
taos_query
(
taos
,
command
);
int32_t
code
=
taos_errno
(
res
);
if
(
code
!=
0
)
{
pPrint
(
"%s Failed to execute <%s>, reason: %s %s"
,
GREEN
,
command
,
taos_errstr
(
res
),
NC
);
taos_free_result
(
res
);
return
-
1
;
}
if
(
INSERT_TYPE
==
type
)
{
int
affectedRows
=
taos_affected_rows
(
res
);
taos_free_result
(
res
);
return
affectedRows
;
}
taos_free_result
(
res
);
return
0
;
}
void
*
ombProduceThreadFunc
(
void
*
param
)
{
SThreadInfo
*
pInfo
=
(
SThreadInfo
*
)
param
;
pInfo
->
taos
=
taos_connect
(
NULL
,
"root"
,
"taosdata"
,
NULL
,
0
);
if
(
pInfo
->
taos
==
NULL
)
{
printf
(
"taos_connect() fail
\n
"
);
return
NULL
;
}
int64_t
affectedRowsTotal
=
0
;
int64_t
sendMsgs
=
0
;
uint32_t
totalSendLoopTimes
=
g_stConfInfo
.
runDurationMinutes
*
60
*
1000
/
SEND_TIME_UNIT
;
// send some msgs per 10ms
uint32_t
batchPerTblTimes
=
pInfo
->
producerRate
/
100
/
g_stConfInfo
.
batchSize
;
uint32_t
remainder
=
(
pInfo
->
producerRate
/
100
)
%
g_stConfInfo
.
batchSize
;
if
(
remainder
)
{
batchPerTblTimes
+=
1
;
}
char
*
sqlBuf
=
taosMemoryMalloc
(
MAX_SQL_LEN
);
if
(
NULL
==
sqlBuf
)
{
printf
(
"malloc fail for sqlBuf
\n
"
);
return
NULL
;
}
printf
(
"Produce Info: totalSendLoopTimes: %d, batchPerTblTimes: %d, producerRate: %d
\n
"
,
totalSendLoopTimes
,
batchPerTblTimes
,
pInfo
->
producerRate
);
char
ctbName
[
64
]
=
{
0
};
sprintf
(
ctbName
,
"%s.ctb%d"
,
g_stConfInfo
.
dbName
,
pInfo
->
consumerId
);
int64_t
lastPrintTime
=
taosGetTimestampUs
();
int64_t
totalMsgLen
=
0
;
//int64_t timeStamp = taosGetTimestampUs();
while
(
totalSendLoopTimes
)
{
int64_t
startTs
=
taosGetTimestampUs
();
for
(
int
i
=
0
;
i
<
batchPerTblTimes
;
++
i
)
{
uint32_t
msgsOfSql
=
g_stConfInfo
.
batchSize
;
if
((
i
==
batchPerTblTimes
-
1
)
&&
(
0
!=
remainder
))
{
msgsOfSql
=
remainder
;
}
int
len
=
0
;
len
+=
snprintf
(
sqlBuf
+
len
,
MAX_SQL_LEN
-
len
,
"insert into %s values "
,
ctbName
);
for
(
int
j
=
0
;
j
<
msgsOfSql
;
j
++
)
{
int64_t
timeStamp
=
taosGetTimestampNs
();
len
+=
snprintf
(
sqlBuf
+
len
,
MAX_SQL_LEN
-
len
,
"(%"
PRId64
",
\"
%s
\"
)"
,
timeStamp
,
g_payload
);
sendMsgs
++
;
pInfo
->
totalProduceMsgs
++
;
}
totalMsgLen
+=
len
;
pInfo
->
totalMsgsLen
+=
len
;
int64_t
affectedRows
=
queryDbExec
(
pInfo
->
taos
,
sqlBuf
,
INSERT_TYPE
);
if
(
affectedRows
<
0
)
{
return
NULL
;
}
affectedRowsTotal
+=
affectedRows
;
//printf("Produce Info: affectedRows: %" PRId64 "\n", affectedRows);
}
totalSendLoopTimes
-=
1
;
// calc spent time
int64_t
currentTs
=
taosGetTimestampUs
();
int64_t
delta
=
currentTs
-
startTs
;
if
(
delta
<
SEND_TIME_UNIT
*
1000
)
{
int64_t
sleepLen
=
(
int32_t
)(
SEND_TIME_UNIT
*
1000
-
delta
);
//printf("sleep %" PRId64 " us, use time: %" PRId64 " us\n", sleepLen, delta);
taosUsleep
((
int32_t
)
sleepLen
);
}
currentTs
=
taosGetTimestampUs
();
delta
=
currentTs
-
lastPrintTime
;
if
(
delta
>
10
*
1000
*
1000
)
{
printf
(
"producer[%d] info: %"
PRId64
" msgs, %"
PRId64
" Byte, %"
PRId64
" us, totalSendLoopTimes: %d
\n
"
,
pInfo
->
consumerId
,
sendMsgs
,
totalMsgLen
,
delta
,
totalSendLoopTimes
);
printf
(
"producer[%d] rate: %1.f msgs/s, %1.f KB/s
\n
"
,
pInfo
->
consumerId
,
sendMsgs
*
1000
.
0
*
1000
/
delta
,
(
totalMsgLen
/
1024
.
0
)
/
(
delta
/
(
1000
*
1000
)));
lastPrintTime
=
currentTs
;
sendMsgs
=
0
;
totalMsgLen
=
0
;
}
}
printf
(
"affectedRowsTotal: %"
PRId64
"
\n
"
,
affectedRowsTotal
);
return
NULL
;
}
void
printProduceInfo
(
int64_t
start
)
{
int64_t
totalMsgs
=
0
;
int64_t
totalLenOfMsgs
=
0
;
for
(
int
i
=
0
;
i
<
g_stConfInfo
.
producers
;
i
++
)
{
totalMsgs
+=
g_stConfInfo
.
stProdThreads
[
i
].
totalProduceMsgs
;
totalLenOfMsgs
+=
g_stConfInfo
.
stProdThreads
[
i
].
totalMsgsLen
;
}
int64_t
end
=
taosGetTimestampUs
();
int64_t
t
=
end
-
start
;
if
(
0
==
t
)
t
=
1
;
double
tInMs
=
(
double
)
t
/
1000000
.
0
;
printf
(
"Spent %.3f seconds to prod %"
PRIu64
" msgs, %"
PRIu64
" Byte
\n\n
"
,
tInMs
,
totalMsgs
,
totalLenOfMsgs
);
printf
(
"Spent %.3f seconds to prod %"
PRIu64
" msgs with %d producer(s), throughput: %.3f msgs/s, %.1f MB/s
\n\n
"
,
tInMs
,
totalMsgs
,
g_stConfInfo
.
producers
,
(
double
)
totalMsgs
/
tInMs
,
(
double
)
totalLenOfMsgs
/
(
1024
.
0
*
1024
)
/
tInMs
);
return
;
}
void
startOmbConsume
()
{
TdThreadAttr
thattr
;
taosThreadAttrInit
(
&
thattr
);
taosThreadAttrSetDetachState
(
&
thattr
,
PTHREAD_CREATE_JOINABLE
);
if
(
0
!=
g_stConfInfo
.
producers
)
{
TAOS
*
taos
=
taos_connect
(
NULL
,
"root"
,
"taosdata"
,
NULL
,
0
);
if
(
taos
==
NULL
)
{
taosFprintfFile
(
g_fp
,
"taos_connect() fail, can not notify and save consume result to main scripte
\n
"
);
ASSERT
(
0
);
return
;
}
char
stbName
[
16
]
=
"stb"
;
char
ctbPrefix
[
16
]
=
"ctb"
;
char
sql
[
256
]
=
{
0
};
sprintf
(
sql
,
"drop database if exists %s"
,
g_stConfInfo
.
dbName
);
printf
(
"SQL: %s
\n
"
,
sql
);
queryDbExec
(
taos
,
sql
,
NO_INSERT_TYPE
);
sprintf
(
sql
,
"create database if not exists %s precision 'ns' vgroups %d"
,
g_stConfInfo
.
dbName
,
g_stConfInfo
.
producers
);
printf
(
"SQL: %s
\n
"
,
sql
);
queryDbExec
(
taos
,
sql
,
NO_INSERT_TYPE
);
sprintf
(
sql
,
"create stable %s.%s (ts timestamp, payload binary(%d)) tags (t bigint) "
,
g_stConfInfo
.
dbName
,
stbName
,
g_stConfInfo
.
payloadLen
);
printf
(
"SQL: %s
\n
"
,
sql
);
queryDbExec
(
taos
,
sql
,
NO_INSERT_TYPE
);
for
(
int
i
=
0
;
i
<
g_stConfInfo
.
producers
;
i
++
)
{
sprintf
(
sql
,
"create table %s.%s%d using %s.stb tags(%d) "
,
g_stConfInfo
.
dbName
,
ctbPrefix
,
i
,
g_stConfInfo
.
dbName
,
i
);
printf
(
"SQL: %s
\n
"
,
sql
);
queryDbExec
(
taos
,
sql
,
NO_INSERT_TYPE
);
}
// create topic
sprintf
(
sql
,
"create topic %s as stable %s.%s"
,
g_stConfInfo
.
topic
,
g_stConfInfo
.
dbName
,
stbName
);
printf
(
"SQL: %s
\n
"
,
sql
);
queryDbExec
(
taos
,
sql
,
NO_INSERT_TYPE
);
int32_t
producerRate
=
ceil
(
g_stConfInfo
.
producerRate
/
g_stConfInfo
.
producers
);
printf
(
"==== create %d produce thread ====
\n
"
,
g_stConfInfo
.
producers
);
for
(
int32_t
i
=
0
;
i
<
g_stConfInfo
.
producers
;
++
i
)
{
g_stConfInfo
.
stProdThreads
[
i
].
consumerId
=
i
;
g_stConfInfo
.
stProdThreads
[
i
].
producerRate
=
producerRate
;
taosThreadCreate
(
&
(
g_stConfInfo
.
stProdThreads
[
i
].
thread
),
&
thattr
,
ombProduceThreadFunc
,
(
void
*
)(
&
(
g_stConfInfo
.
stProdThreads
[
i
])));
}
if
(
0
==
g_stConfInfo
.
numOfThread
)
{
int64_t
start
=
taosGetTimestampUs
();
for
(
int32_t
i
=
0
;
i
<
g_stConfInfo
.
producers
;
i
++
)
{
taosThreadJoin
(
g_stConfInfo
.
stProdThreads
[
i
].
thread
,
NULL
);
taosThreadClear
(
&
g_stConfInfo
.
stProdThreads
[
i
].
thread
);
}
printProduceInfo
(
start
);
taosFprintfFile
(
g_fp
,
"==== close tmqlog ====
\n
"
);
taosCloseFile
(
&
g_fp
);
return
;
}
}
// pthread_create one thread to consume
taosFprintfFile
(
g_fp
,
"==== create %d consume thread ====
\n
"
,
g_stConfInfo
.
numOfThread
);
for
(
int32_t
i
=
0
;
i
<
g_stConfInfo
.
numOfThread
;
++
i
)
{
g_stConfInfo
.
stThreads
[
i
].
consumerId
=
i
;
taosThreadCreate
(
&
(
g_stConfInfo
.
stThreads
[
i
].
thread
),
&
thattr
,
ombConsumeThreadFunc
,
(
void
*
)(
&
(
g_stConfInfo
.
stThreads
[
i
])));
}
int64_t
start
=
taosGetTimestampUs
();
for
(
int32_t
i
=
0
;
i
<
g_stConfInfo
.
numOfThread
;
i
++
)
{
taosThreadJoin
(
g_stConfInfo
.
stThreads
[
i
].
thread
,
NULL
);
taosThreadClear
(
&
g_stConfInfo
.
stThreads
[
i
].
thread
);
}
int64_t
end
=
taosGetTimestampUs
();
int64_t
totalRows
=
0
;
int64_t
totalMsgs
=
0
;
int64_t
totalLenOfMsgs
=
0
;
for
(
int32_t
i
=
0
;
i
<
g_stConfInfo
.
numOfThread
;
i
++
)
{
totalMsgs
+=
g_stConfInfo
.
stThreads
[
i
].
consumeMsgCnt
;
totalLenOfMsgs
+=
g_stConfInfo
.
stThreads
[
i
].
consumeLen
;
totalRows
+=
g_stConfInfo
.
stThreads
[
i
].
consumeRowCnt
;
}
int64_t
t
=
end
-
start
;
if
(
0
==
t
)
t
=
1
;
double
tInMs
=
(
double
)
t
/
1000000
.
0
;
taosFprintfFile
(
g_fp
,
"Spent %.3f seconds to poll msgs: %"
PRIu64
" with %d thread(s), throughput: %.3f msgs/s, %.1f MB/s
\n\n
"
,
tInMs
,
totalMsgs
,
g_stConfInfo
.
numOfThread
,
(
double
)(
totalMsgs
/
tInMs
),
(
double
)
totalLenOfMsgs
/
(
1024
*
1024
)
/
tInMs
);
printf
(
"Spent %.3f seconds to cons rows: %"
PRIu64
" msgs: %"
PRIu64
" with %d thread(s), throughput: %.3f msgs/s, %.1f MB/s
\n\n
"
,
tInMs
,
totalRows
,
totalMsgs
,
g_stConfInfo
.
numOfThread
,
(
double
)(
totalMsgs
/
tInMs
),
(
double
)
totalLenOfMsgs
/
(
1024
*
1024
)
/
tInMs
);
taosFprintfFile
(
g_fp
,
"==== close tmqlog ====
\n
"
);
taosCloseFile
(
&
g_fp
);
return
;
}
int
main
(
int32_t
argc
,
char
*
argv
[])
{
parseArgument
(
argc
,
argv
);
if
(
0
!=
strlen
(
g_stConfInfo
.
topic
))
{
startOmbConsume
();
return
0
;
}
getConsumeInfo
();
saveConfigToLogFile
();
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录