Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
9b9e4827
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
9b9e4827
编写于
3月 07, 2022
作者:
C
Cary Xu
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' into feature/TD-11463-3.0
上级
f3b2dc89
611a2c62
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
706 addition
and
1 deletion
+706
-1
.gitignore
.gitignore
+0
-1
tests/test/c/tmqDemo.c
tests/test/c/tmqDemo.c
+706
-0
未找到文件。
.gitignore
浏览文件 @
9b9e4827
...
...
@@ -24,7 +24,6 @@ mac/
*.orig
src/connector/nodejs/node_modules/
src/connector/nodejs/out/
tests/test/
tests/taoshebei/
tests/taoscsv/
tests/taosdalipu/
...
...
tests/test/c/tmqDemo.c
0 → 100644
浏览文件 @
9b9e4827
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// clang-format off
#include <assert.h>
#include <stdio.h>
#include <string.h>
#include <time.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdlib.h>
#include <dirent.h>
#include "taos.h"
#include "taoserror.h"
#include "tlog.h"
#define GREEN "\033[1;32m"
#define NC "\033[0m"
#define min(a, b) (((a) < (b)) ? (a) : (b))
#define MAX_SQL_STR_LEN (1024 * 1024)
#define MAX_ROW_STR_LEN (16 * 1024)
enum
_RUN_MODE
{
TMQ_RUN_INSERT_AND_CONSUME
,
TMQ_RUN_ONLY_INSERT
,
TMQ_RUN_ONLY_CONSUME
,
TMQ_RUN_MODE_BUTT
};
typedef
struct
{
char
dbName
[
32
];
char
stbName
[
64
];
char
resultFileName
[
256
];
char
vnodeWalPath
[
256
];
int32_t
numOfThreads
;
int32_t
numOfTables
;
int32_t
numOfVgroups
;
int32_t
runMode
;
int32_t
numOfColumn
;
double
ratio
;
int32_t
batchNumOfRow
;
int32_t
totalRowsOfPerTbl
;
int64_t
startTimestamp
;
int32_t
showMsgFlag
;
int32_t
totalRowsOfT2
;
}
SConfInfo
;
static
SConfInfo
g_stConfInfo
=
{
"tmqdb"
,
"stb"
,
"./tmqResult.txt"
,
// output_file
"/data2/dnode/data/vnode/vnode2/wal"
,
1
,
// threads
1
,
// tables
1
,
// vgroups
0
,
// run mode
1
,
// columns
1
,
// ratio
1
,
// batch size
10000
,
// total rows for per table
0
,
// 2020-01-01 00:00:00.000
0
,
// show consume msg switch
10000
,
};
char
*
g_pRowValue
=
NULL
;
TdFilePtr
g_fp
=
NULL
;
static
void
printHelp
()
{
char
indent
[
10
]
=
" "
;
printf
(
"Used to test the performance while create table
\n
"
);
printf
(
"%s%s
\n
"
,
indent
,
"-c"
);
printf
(
"%s%s%s%s
\n
"
,
indent
,
indent
,
"Configuration directory, default is "
,
configDir
);
printf
(
"%s%s
\n
"
,
indent
,
"-d"
);
printf
(
"%s%s%s%s
\n
"
,
indent
,
indent
,
"The name of the database to be created, default is "
,
g_stConfInfo
.
dbName
);
printf
(
"%s%s
\n
"
,
indent
,
"-s"
);
printf
(
"%s%s%s%s
\n
"
,
indent
,
indent
,
"The name of the super table to be created, default is "
,
g_stConfInfo
.
stbName
);
printf
(
"%s%s
\n
"
,
indent
,
"-f"
);
printf
(
"%s%s%s%s
\n
"
,
indent
,
indent
,
"The file of result, default is "
,
g_stConfInfo
.
resultFileName
);
printf
(
"%s%s
\n
"
,
indent
,
"-w"
);
printf
(
"%s%s%s%s
\n
"
,
indent
,
indent
,
"The path of vnode of wal, default is "
,
g_stConfInfo
.
vnodeWalPath
);
printf
(
"%s%s
\n
"
,
indent
,
"-t"
);
printf
(
"%s%s%s%d
\n
"
,
indent
,
indent
,
"numOfThreads, default is "
,
g_stConfInfo
.
numOfThreads
);
printf
(
"%s%s
\n
"
,
indent
,
"-n"
);
printf
(
"%s%s%s%d
\n
"
,
indent
,
indent
,
"numOfTables, default is "
,
g_stConfInfo
.
numOfTables
);
printf
(
"%s%s
\n
"
,
indent
,
"-v"
);
printf
(
"%s%s%s%d
\n
"
,
indent
,
indent
,
"numOfVgroups, default is "
,
g_stConfInfo
.
numOfVgroups
);
printf
(
"%s%s
\n
"
,
indent
,
"-a"
);
printf
(
"%s%s%s%d
\n
"
,
indent
,
indent
,
"runMode, default is "
,
g_stConfInfo
.
runMode
);
printf
(
"%s%s
\n
"
,
indent
,
"-l"
);
printf
(
"%s%s%s%d
\n
"
,
indent
,
indent
,
"numOfColumn, default is "
,
g_stConfInfo
.
numOfColumn
);
printf
(
"%s%s
\n
"
,
indent
,
"-q"
);
printf
(
"%s%s%s%f
\n
"
,
indent
,
indent
,
"ratio, default is "
,
g_stConfInfo
.
ratio
);
printf
(
"%s%s
\n
"
,
indent
,
"-b"
);
printf
(
"%s%s%s%d
\n
"
,
indent
,
indent
,
"batchNumOfRow, default is "
,
g_stConfInfo
.
batchNumOfRow
);
printf
(
"%s%s
\n
"
,
indent
,
"-r"
);
printf
(
"%s%s%s%d
\n
"
,
indent
,
indent
,
"totalRowsOfPerTbl, default is "
,
g_stConfInfo
.
totalRowsOfPerTbl
);
printf
(
"%s%s
\n
"
,
indent
,
"-m"
);
printf
(
"%s%s%s%"
PRId64
"
\n
"
,
indent
,
indent
,
"startTimestamp, default is "
,
g_stConfInfo
.
startTimestamp
);
printf
(
"%s%s
\n
"
,
indent
,
"-g"
);
printf
(
"%s%s%s%d
\n
"
,
indent
,
indent
,
"showMsgFlag, default is "
,
g_stConfInfo
.
showMsgFlag
);
exit
(
EXIT_SUCCESS
);
}
void
parseArgument
(
int32_t
argc
,
char
*
argv
[])
{
g_stConfInfo
.
startTimestamp
=
1640966400000
;
// 2020-01-01 00:00:00.000
for
(
int32_t
i
=
1
;
i
<
argc
;
i
++
)
{
if
(
strcmp
(
argv
[
i
],
"-h"
)
==
0
||
strcmp
(
argv
[
i
],
"--help"
)
==
0
)
{
printHelp
();
exit
(
0
);
}
else
if
(
strcmp
(
argv
[
i
],
"-d"
)
==
0
)
{
strcpy
(
g_stConfInfo
.
dbName
,
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-c"
)
==
0
)
{
strcpy
(
configDir
,
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-s"
)
==
0
)
{
strcpy
(
g_stConfInfo
.
stbName
,
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-w"
)
==
0
)
{
strcpy
(
g_stConfInfo
.
vnodeWalPath
,
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-f"
)
==
0
)
{
strcpy
(
g_stConfInfo
.
resultFileName
,
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-t"
)
==
0
)
{
g_stConfInfo
.
numOfThreads
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-n"
)
==
0
)
{
g_stConfInfo
.
numOfTables
=
atoll
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-v"
)
==
0
)
{
g_stConfInfo
.
numOfVgroups
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-a"
)
==
0
)
{
g_stConfInfo
.
runMode
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-b"
)
==
0
)
{
g_stConfInfo
.
batchNumOfRow
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-r"
)
==
0
)
{
g_stConfInfo
.
totalRowsOfPerTbl
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-l"
)
==
0
)
{
g_stConfInfo
.
numOfColumn
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-q"
)
==
0
)
{
g_stConfInfo
.
ratio
=
atof
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-m"
)
==
0
)
{
g_stConfInfo
.
startTimestamp
=
atol
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-g"
)
==
0
)
{
g_stConfInfo
.
showMsgFlag
=
atol
(
argv
[
++
i
]);
}
else
{
pPrint
(
"%s unknow para: %s %s"
,
GREEN
,
argv
[
++
i
],
NC
);
exit
(
-
1
);
}
}
g_stConfInfo
.
totalRowsOfT2
=
g_stConfInfo
.
totalRowsOfPerTbl
*
g_stConfInfo
.
ratio
;
pPrint
(
"%s configDir:%s %s"
,
GREEN
,
configDir
,
NC
);
pPrint
(
"%s dbName:%s %s"
,
GREEN
,
g_stConfInfo
.
dbName
,
NC
);
pPrint
(
"%s stbName:%s %s"
,
GREEN
,
g_stConfInfo
.
stbName
,
NC
);
pPrint
(
"%s resultFileName:%s %s"
,
GREEN
,
g_stConfInfo
.
resultFileName
,
NC
);
pPrint
(
"%s vnodeWalPath:%s %s"
,
GREEN
,
g_stConfInfo
.
vnodeWalPath
,
NC
);
pPrint
(
"%s numOfTables:%d %s"
,
GREEN
,
g_stConfInfo
.
numOfTables
,
NC
);
pPrint
(
"%s numOfThreads:%d %s"
,
GREEN
,
g_stConfInfo
.
numOfThreads
,
NC
);
pPrint
(
"%s numOfVgroups:%d %s"
,
GREEN
,
g_stConfInfo
.
numOfVgroups
,
NC
);
pPrint
(
"%s runMode:%d %s"
,
GREEN
,
g_stConfInfo
.
runMode
,
NC
);
pPrint
(
"%s ratio:%f %s"
,
GREEN
,
g_stConfInfo
.
ratio
,
NC
);
pPrint
(
"%s numOfColumn:%d %s"
,
GREEN
,
g_stConfInfo
.
numOfColumn
,
NC
);
pPrint
(
"%s batchNumOfRow:%d %s"
,
GREEN
,
g_stConfInfo
.
batchNumOfRow
,
NC
);
pPrint
(
"%s totalRowsOfPerTbl:%d %s"
,
GREEN
,
g_stConfInfo
.
totalRowsOfPerTbl
,
NC
);
pPrint
(
"%s totalRowsOfT2:%d %s"
,
GREEN
,
g_stConfInfo
.
totalRowsOfT2
,
NC
);
pPrint
(
"%s startTimestamp:%"
PRId64
" %s"
,
GREEN
,
g_stConfInfo
.
startTimestamp
,
NC
);
pPrint
(
"%s showMsgFlag:%d %s"
,
GREEN
,
g_stConfInfo
.
showMsgFlag
,
NC
);
}
static
int
running
=
1
;
static
void
msg_process
(
tmq_message_t
*
message
)
{
tmqShowMsg
(
message
);
}
// calc dir size (not include itself 4096Byte)
int64_t
getDirectorySize
(
char
*
dir
)
{
DIR
*
dp
;
struct
dirent
*
entry
;
int64_t
totalSize
=
0
;
if
((
dp
=
opendir
(
dir
))
==
NULL
)
{
fprintf
(
stderr
,
"Cannot open dir: %s
\n
"
,
dir
);
return
-
1
;
}
//lstat(dir, &statbuf);
//totalSize+=statbuf.st_size;
while
((
entry
=
readdir
(
dp
))
!=
NULL
)
{
char
subdir
[
1024
];
sprintf
(
subdir
,
"%s/%s"
,
dir
,
entry
->
d_name
);
//printf("===d_name: %s\n", entry->d_name);
if
(
taosIsDir
(
subdir
))
{
if
(
strcmp
(
"."
,
entry
->
d_name
)
==
0
||
strcmp
(
".."
,
entry
->
d_name
)
==
0
)
{
continue
;
}
int64_t
subDirSize
=
getDirectorySize
(
subdir
);
totalSize
+=
subDirSize
;
}
else
if
(
0
==
strcmp
(
strchr
(
entry
->
d_name
,
'.'
),
".log"
))
{
// only calc .log file size, and not include .idx file
int64_t
file_size
=
0
;
taosStatFile
(
subdir
,
&
file_size
,
NULL
);
totalSize
+=
file_size
;
}
}
closedir
(
dp
);
return
totalSize
;
}
int
queryDB
(
TAOS
*
taos
,
char
*
command
)
{
TAOS_RES
*
pRes
=
taos_query
(
taos
,
command
);
int
code
=
taos_errno
(
pRes
);
//if ((code != 0) && (code != TSDB_CODE_RPC_AUTH_REQUIRED)) {
if
(
code
!=
0
)
{
pError
(
"failed to reason:%s, sql: %s"
,
tstrerror
(
code
),
command
);
taos_free_result
(
pRes
);
return
-
1
;
}
taos_free_result
(
pRes
);
return
0
;
}
int32_t
init_env
()
{
char
sqlStr
[
1024
]
=
{
0
};
TAOS
*
pConn
=
taos_connect
(
NULL
,
"root"
,
"taosdata"
,
NULL
,
0
);
if
(
pConn
==
NULL
)
{
return
-
1
;
}
sprintf
(
sqlStr
,
"create database if not exists %s vgroups %d"
,
g_stConfInfo
.
dbName
,
g_stConfInfo
.
numOfVgroups
);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
sqlStr
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in create db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
sprintf
(
sqlStr
,
"use %s"
,
g_stConfInfo
.
dbName
);
pRes
=
taos_query
(
pConn
,
sqlStr
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in use db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
// create row value
g_pRowValue
=
(
char
*
)
calloc
(
1
,
g_stConfInfo
.
numOfColumn
*
16
+
128
);
if
(
NULL
==
g_pRowValue
)
{
return
-
1
;
}
int32_t
dataLen
=
0
;
int32_t
sqlLen
=
0
;
sqlLen
+=
sprintf
(
sqlStr
+
sqlLen
,
"create stable if not exists %s (ts timestamp, "
,
g_stConfInfo
.
stbName
);
for
(
int32_t
i
=
0
;
i
<
g_stConfInfo
.
numOfColumn
;
i
++
)
{
if
(
i
==
g_stConfInfo
.
numOfColumn
-
1
)
{
sqlLen
+=
sprintf
(
sqlStr
+
sqlLen
,
"c%d int) "
,
i
);
memcpy
(
g_pRowValue
+
dataLen
,
"66778899"
,
strlen
(
"66778899"
));
dataLen
+=
strlen
(
"66778899"
);
}
else
{
sqlLen
+=
sprintf
(
sqlStr
+
sqlLen
,
"c%d int, "
,
i
);
memcpy
(
g_pRowValue
+
dataLen
,
"66778899, "
,
strlen
(
"66778899, "
));
dataLen
+=
strlen
(
"66778899, "
);
}
}
sqlLen
+=
sprintf
(
sqlStr
+
sqlLen
,
"tags (t0 int)"
);
pRes
=
taos_query
(
pConn
,
sqlStr
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create super table %s, reason:%s
\n
"
,
g_stConfInfo
.
stbName
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
for
(
int32_t
i
=
0
;
i
<
g_stConfInfo
.
numOfTables
;
i
++
)
{
sprintf
(
sqlStr
,
"create table if not exists %s%d using %s tags(1)"
,
g_stConfInfo
.
stbName
,
i
,
g_stConfInfo
.
stbName
);
pRes
=
taos_query
(
pConn
,
sqlStr
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create child table %s%d, reason:%s
\n
"
,
g_stConfInfo
.
stbName
,
i
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
}
//const char* sql = "select * from tu1";
sprintf
(
sqlStr
,
"select * from %s%d"
,
g_stConfInfo
.
stbName
,
0
);
pRes
=
tmq_create_topic
(
pConn
,
"test_stb_topic_1"
,
sqlStr
,
strlen
(
sqlStr
));
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create topic test_stb_topic_1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
taos_close
(
pConn
);
return
0
;
}
tmq_t
*
build_consumer
()
{
char
sqlStr
[
1024
]
=
{
0
};
TAOS
*
pConn
=
taos_connect
(
NULL
,
"root"
,
"taosdata"
,
NULL
,
0
);
assert
(
pConn
!=
NULL
);
sprintf
(
sqlStr
,
"use %s"
,
g_stConfInfo
.
dbName
);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
sqlStr
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in use db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
}
taos_free_result
(
pRes
);
tmq_conf_t
*
conf
=
tmq_conf_new
();
tmq_conf_set
(
conf
,
"group.id"
,
"tg2"
);
tmq_t
*
tmq
=
tmq_consumer_new
(
pConn
,
conf
,
NULL
,
0
);
return
tmq
;
}
tmq_list_t
*
build_topic_list
()
{
tmq_list_t
*
topic_list
=
tmq_list_new
();
tmq_list_append
(
topic_list
,
"test_stb_topic_1"
);
return
topic_list
;
}
void
basic_consume_loop
(
tmq_t
*
tmq
,
tmq_list_t
*
topics
)
{
tmq_resp_err_t
err
;
if
((
err
=
tmq_subscribe
(
tmq
,
topics
)))
{
fprintf
(
stderr
,
"%% Failed to start consuming topics: %s
\n
"
,
tmq_err2str
(
err
));
printf
(
"subscribe err
\n
"
);
return
;
}
int32_t
cnt
=
0
;
/*clock_t startTime = clock();*/
while
(
running
)
{
tmq_message_t
*
tmqmessage
=
tmq_consumer_poll
(
tmq
,
500
);
if
(
tmqmessage
)
{
cnt
++
;
msg_process
(
tmqmessage
);
tmq_message_destroy
(
tmqmessage
);
/*} else {*/
/*break;*/
}
}
/*clock_t endTime = clock();*/
/*printf("log cnt: %d %f s\n", cnt, (double)(endTime - startTime) / CLOCKS_PER_SEC);*/
err
=
tmq_consumer_close
(
tmq
);
if
(
err
)
fprintf
(
stderr
,
"%% Failed to close consumer: %s
\n
"
,
tmq_err2str
(
err
));
else
fprintf
(
stderr
,
"%% Consumer closed
\n
"
);
}
void
sync_consume_loop
(
tmq_t
*
tmq
,
tmq_list_t
*
topics
)
{
static
const
int
MIN_COMMIT_COUNT
=
1000
;
int
msg_count
=
0
;
tmq_resp_err_t
err
;
if
((
err
=
tmq_subscribe
(
tmq
,
topics
)))
{
fprintf
(
stderr
,
"%% Failed to start consuming topics: %s
\n
"
,
tmq_err2str
(
err
));
return
;
}
while
(
running
)
{
tmq_message_t
*
tmqmessage
=
tmq_consumer_poll
(
tmq
,
500
);
if
(
tmqmessage
)
{
msg_process
(
tmqmessage
);
tmq_message_destroy
(
tmqmessage
);
if
((
++
msg_count
%
MIN_COMMIT_COUNT
)
==
0
)
tmq_commit
(
tmq
,
NULL
,
0
);
}
}
err
=
tmq_consumer_close
(
tmq
);
if
(
err
)
fprintf
(
stderr
,
"%% Failed to close consumer: %s
\n
"
,
tmq_err2str
(
err
));
else
fprintf
(
stderr
,
"%% Consumer closed
\n
"
);
}
void
perf_loop
(
tmq_t
*
tmq
,
tmq_list_t
*
topics
,
int32_t
totalMsgs
,
int64_t
walLogSize
)
{
tmq_resp_err_t
err
;
if
((
err
=
tmq_subscribe
(
tmq
,
topics
)))
{
fprintf
(
stderr
,
"%% Failed to start consuming topics: %s
\n
"
,
tmq_err2str
(
err
));
printf
(
"subscribe err
\n
"
);
return
;
}
int32_t
batchCnt
=
0
;
int32_t
skipLogNum
=
0
;
int64_t
startTime
=
taosGetTimestampUs
();
while
(
running
)
{
tmq_message_t
*
tmqmessage
=
tmq_consumer_poll
(
tmq
,
500
);
if
(
tmqmessage
)
{
batchCnt
++
;
skipLogNum
+=
tmqGetSkipLogNum
(
tmqmessage
);
if
(
0
!=
g_stConfInfo
.
showMsgFlag
)
{
msg_process
(
tmqmessage
);
}
tmq_message_destroy
(
tmqmessage
);
}
else
{
break
;
}
}
int64_t
endTime
=
taosGetTimestampUs
();
double
consumeTime
=
(
double
)(
endTime
-
startTime
)
/
1000000
;
if
(
batchCnt
!=
totalMsgs
)
{
pPrint
(
"%s inserted msgs: %d and consume msgs: %d mismatch %s"
,
GREEN
,
totalMsgs
,
batchCnt
,
NC
);
}
pPrint
(
"consume result: msgs: %d, skip log cnt: %d, time used:%.3f second
\n
"
,
batchCnt
,
skipLogNum
,
consumeTime
);
taosFprintfFile
(
g_fp
,
"|%10d | %10.3f | %8.2f | %10.2f| %10.2f |
\n
"
,
batchCnt
,
consumeTime
,
(
double
)
batchCnt
/
consumeTime
,
(
double
)
walLogSize
/
(
1024
*
1024
.
0
)
/
consumeTime
,
(
double
)
walLogSize
/
1024
.
0
/
batchCnt
);
err
=
tmq_consumer_close
(
tmq
);
if
(
err
)
{
fprintf
(
stderr
,
"%% Failed to close consumer: %s
\n
"
,
tmq_err2str
(
err
));
}
}
// sync insertion
int32_t
syncWriteData
()
{
TAOS
*
pConn
=
taos_connect
(
NULL
,
"root"
,
"taosdata"
,
NULL
,
0
);
if
(
pConn
==
NULL
)
{
return
-
1
;
}
char
sqlStr
[
1024
]
=
{
0
};
sprintf
(
sqlStr
,
"use %s"
,
g_stConfInfo
.
dbName
);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
sqlStr
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in use db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
char
*
buffer
=
NULL
;
buffer
=
(
char
*
)
malloc
(
MAX_SQL_STR_LEN
);
if
(
NULL
==
buffer
)
{
return
-
1
;
}
int32_t
totalMsgs
=
0
;
int64_t
time_counter
=
g_stConfInfo
.
startTimestamp
;
for
(
int
i
=
0
;
i
<
g_stConfInfo
.
totalRowsOfPerTbl
;)
{
for
(
int
tID
=
0
;
tID
<=
g_stConfInfo
.
numOfTables
-
1
;
tID
++
)
{
int
inserted
=
i
;
int64_t
tmp_time
=
time_counter
;
int32_t
data_len
=
0
;
data_len
+=
sprintf
(
buffer
+
data_len
,
"insert into %s%d values"
,
g_stConfInfo
.
stbName
,
tID
);
int
k
;
for
(
k
=
0
;
k
<
g_stConfInfo
.
batchNumOfRow
;)
{
data_len
+=
sprintf
(
buffer
+
data_len
,
"(%"
PRId64
", %s) "
,
tmp_time
++
,
g_pRowValue
);
inserted
++
;
k
++
;
if
(
inserted
>=
g_stConfInfo
.
totalRowsOfPerTbl
)
{
break
;
}
if
(
data_len
>
MAX_SQL_STR_LEN
-
MAX_ROW_STR_LEN
)
{
break
;
}
}
int
code
=
queryDB
(
pConn
,
buffer
);
if
(
0
!=
code
){
fprintf
(
stderr
,
"insert data error!
\n
"
);
tfree
(
buffer
);
return
-
1
;
}
totalMsgs
++
;
if
(
tID
==
g_stConfInfo
.
numOfTables
-
1
)
{
i
=
inserted
;
time_counter
=
tmp_time
;
}
}
}
tfree
(
buffer
);
return
totalMsgs
;
}
// sync insertion
int32_t
syncWriteDataByRatio
()
{
TAOS
*
pConn
=
taos_connect
(
NULL
,
"root"
,
"taosdata"
,
NULL
,
0
);
if
(
pConn
==
NULL
)
{
return
-
1
;
}
char
sqlStr
[
1024
]
=
{
0
};
sprintf
(
sqlStr
,
"use %s"
,
g_stConfInfo
.
dbName
);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
sqlStr
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in use db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
char
*
buffer
=
NULL
;
buffer
=
(
char
*
)
malloc
(
MAX_SQL_STR_LEN
);
if
(
NULL
==
buffer
)
{
return
-
1
;
}
int32_t
totalMsgs
=
0
;
int32_t
insertedOfT1
=
0
;
int32_t
insertedOfT2
=
0
;
int64_t
tsOfT1
=
g_stConfInfo
.
startTimestamp
;
int64_t
tsOfT2
=
g_stConfInfo
.
startTimestamp
;
int64_t
tmp_time
;
for
(;;)
{
if
((
insertedOfT1
>=
g_stConfInfo
.
totalRowsOfPerTbl
)
&&
(
insertedOfT2
>=
g_stConfInfo
.
totalRowsOfT2
))
{
break
;
}
for
(
int
tID
=
0
;
tID
<=
g_stConfInfo
.
numOfTables
-
1
;
tID
++
)
{
if
(
0
==
tID
)
{
tmp_time
=
tsOfT1
;
if
(
insertedOfT1
>=
g_stConfInfo
.
totalRowsOfPerTbl
)
{
continue
;
}
}
else
if
(
1
==
tID
){
tmp_time
=
tsOfT2
;
if
(
insertedOfT2
>=
g_stConfInfo
.
totalRowsOfT2
)
{
continue
;
}
}
int32_t
data_len
=
0
;
data_len
+=
sprintf
(
buffer
+
data_len
,
"insert into %s%d values"
,
g_stConfInfo
.
stbName
,
tID
);
int
k
;
for
(
k
=
0
;
k
<
g_stConfInfo
.
batchNumOfRow
;)
{
data_len
+=
sprintf
(
buffer
+
data_len
,
"(%"
PRId64
", %s) "
,
tmp_time
++
,
g_pRowValue
);
k
++
;
if
(
0
==
tID
)
{
insertedOfT1
++
;
if
(
insertedOfT1
>=
g_stConfInfo
.
totalRowsOfPerTbl
)
{
break
;
}
}
else
if
(
1
==
tID
){
insertedOfT2
++
;
if
(
insertedOfT2
>=
g_stConfInfo
.
totalRowsOfT2
)
{
break
;
}
}
if
(
data_len
>
MAX_SQL_STR_LEN
-
MAX_ROW_STR_LEN
)
{
break
;
}
}
int
code
=
queryDB
(
pConn
,
buffer
);
if
(
0
!=
code
){
fprintf
(
stderr
,
"insert data error!
\n
"
);
tfree
(
buffer
);
return
-
1
;
}
if
(
0
==
tID
)
{
tsOfT1
=
tmp_time
;
}
else
if
(
1
==
tID
){
tsOfT2
=
tmp_time
;
}
totalMsgs
++
;
}
}
pPrint
(
"expect insert rows: T1[%d] T2[%d], actual insert rows: T1[%d] T2[%d]
\n
"
,
g_stConfInfo
.
totalRowsOfPerTbl
,
g_stConfInfo
.
totalRowsOfT2
,
insertedOfT1
,
insertedOfT2
);
tfree
(
buffer
);
return
totalMsgs
;
}
void
printParaIntoFile
()
{
// FILE *fp = fopen(g_stConfInfo.resultFileName, "a");
TdFilePtr
pFile
=
taosOpenFile
(
g_stConfInfo
.
resultFileName
,
TD_FILE_CTEATE
|
TD_FILE_WRITE
|
TD_FILE_APPEND
|
TD_FILE_STREAM
);
if
(
NULL
==
pFile
)
{
fprintf
(
stderr
,
"Failed to open %s for save result
\n
"
,
g_stConfInfo
.
resultFileName
);
exit
-
1
;
};
g_fp
=
pFile
;
time_t
tTime
=
time
(
NULL
);
struct
tm
tm
=
*
localtime
(
&
tTime
);
taosFprintfFile
(
pFile
,
"###################################################################
\n
"
);
taosFprintfFile
(
pFile
,
"# configDir: %s
\n
"
,
configDir
);
taosFprintfFile
(
pFile
,
"# dbName: %s
\n
"
,
g_stConfInfo
.
dbName
);
taosFprintfFile
(
pFile
,
"# stbName: %s
\n
"
,
g_stConfInfo
.
stbName
);
taosFprintfFile
(
pFile
,
"# vnodeWalPath: %s
\n
"
,
g_stConfInfo
.
vnodeWalPath
);
taosFprintfFile
(
pFile
,
"# numOfTables: %d
\n
"
,
g_stConfInfo
.
numOfTables
);
taosFprintfFile
(
pFile
,
"# numOfThreads: %d
\n
"
,
g_stConfInfo
.
numOfThreads
);
taosFprintfFile
(
pFile
,
"# numOfVgroups: %d
\n
"
,
g_stConfInfo
.
numOfVgroups
);
taosFprintfFile
(
pFile
,
"# runMode: %d
\n
"
,
g_stConfInfo
.
runMode
);
taosFprintfFile
(
pFile
,
"# ratio: %f
\n
"
,
g_stConfInfo
.
ratio
);
taosFprintfFile
(
pFile
,
"# numOfColumn: %d
\n
"
,
g_stConfInfo
.
numOfColumn
);
taosFprintfFile
(
pFile
,
"# batchNumOfRow: %d
\n
"
,
g_stConfInfo
.
batchNumOfRow
);
taosFprintfFile
(
pFile
,
"# totalRowsOfPerTbl: %d
\n
"
,
g_stConfInfo
.
totalRowsOfPerTbl
);
taosFprintfFile
(
pFile
,
"# totalRowsOfT2: %d
\n
"
,
g_stConfInfo
.
totalRowsOfT2
);
taosFprintfFile
(
pFile
,
"# Test time: %d-%02d-%02d %02d:%02d:%02d
\n
"
,
tm
.
tm_year
+
1900
,
tm
.
tm_mon
+
1
,
tm
.
tm_mday
,
tm
.
tm_hour
,
tm
.
tm_min
,
tm
.
tm_sec
);
taosFprintfFile
(
pFile
,
"###################################################################
\n
"
);
taosFprintfFile
(
pFile
,
"|-------------------------------insert info-----------------------------|--------------------------------consume info---------------------------------|
\n
"
);
taosFprintfFile
(
pFile
,
"|batch size| insert msgs | insert time(s) | msgs/s | walLogSize(MB) | consume msgs | consume time(s) | msgs/s | MB/s | avg msg size(KB) |
\n
"
);
taosFprintfFile
(
g_fp
,
"|%10d"
,
g_stConfInfo
.
batchNumOfRow
);
}
int
main
(
int32_t
argc
,
char
*
argv
[])
{
parseArgument
(
argc
,
argv
);
printParaIntoFile
();
int64_t
walLogSize
=
0
;
int
code
;
code
=
init_env
();
if
(
code
!=
0
)
{
fprintf
(
stderr
,
"%% init_env error!
\n
"
);
return
-
1
;
}
int32_t
totalMsgs
=
0
;
if
(
g_stConfInfo
.
runMode
!=
TMQ_RUN_ONLY_CONSUME
)
{
int64_t
startTs
=
taosGetTimestampUs
();
if
(
1
==
g_stConfInfo
.
ratio
)
{
totalMsgs
=
syncWriteData
();
}
else
{
totalMsgs
=
syncWriteDataByRatio
();
}
if
(
totalMsgs
<=
0
)
{
pError
(
"inset data error!
\n
"
);
return
-
1
;
}
int64_t
endTs
=
taosGetTimestampUs
();
int64_t
delay
=
endTs
-
startTs
;
int32_t
totalRows
=
0
;
if
(
1
==
g_stConfInfo
.
ratio
)
{
totalRows
=
g_stConfInfo
.
totalRowsOfPerTbl
*
g_stConfInfo
.
numOfTables
;
}
else
{
totalRows
=
g_stConfInfo
.
totalRowsOfPerTbl
*
(
1
+
g_stConfInfo
.
ratio
);
}
float
seconds
=
delay
/
1000000
.
0
;
float
rowsSpeed
=
totalRows
/
seconds
;
float
msgsSpeed
=
totalMsgs
/
seconds
;
walLogSize
=
getDirectorySize
(
g_stConfInfo
.
vnodeWalPath
);
if
(
walLogSize
<=
0
)
{
pError
(
"vnode2/wal size incorrect!"
);
}
else
{
pPrint
(
".log file size in vnode2/wal: %.3f MBytes
\n
"
,
(
double
)
walLogSize
/
(
1024
*
1024
.
0
));
}
pPrint
(
"insert result: %d rows, %d msgs, time:%.3f sec, speed:%.1f rows/second, %.1f msgs/second
\n
"
,
totalRows
,
totalMsgs
,
seconds
,
rowsSpeed
,
msgsSpeed
);
taosFprintfFile
(
g_fp
,
"|%10d | %10.3f | %8.2f | %10.3f "
,
totalMsgs
,
seconds
,
msgsSpeed
,
(
double
)
walLogSize
/
(
1024
*
1024
.
0
));
}
if
(
g_stConfInfo
.
runMode
==
TMQ_RUN_ONLY_INSERT
)
{
return
0
;
}
tmq_t
*
tmq
=
build_consumer
();
tmq_list_t
*
topic_list
=
build_topic_list
();
if
((
NULL
==
tmq
)
||
(
NULL
==
topic_list
)){
return
-
1
;
}
perf_loop
(
tmq
,
topic_list
,
totalMsgs
,
walLogSize
);
tfree
(
g_pRowValue
);
taosFprintfFile
(
g_fp
,
"
\n
"
);
taosCloseFile
(
&
g_fp
);
return
0
;
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录