Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
fd5fd885
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
fd5fd885
编写于
3月 09, 2021
作者:
sangshuduo
提交者:
GitHub
3月 09, 2021
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #5393 from taosdata/hotfix/sangshuduo/TD-3147-insert-rate-more-than-1s
Hotfix/sangshuduo/td 3147 insert rate more than 1s
上级
964f5c5a
255b77af
变更
1
显示空白变更内容
内联
并排
Showing
1 changed file
with
159 addition
and
153 deletion
+159
-153
src/kit/taosdemo/taosdemo.c
src/kit/taosdemo/taosdemo.c
+159
-153
未找到文件。
src/kit/taosdemo/taosdemo.c
浏览文件 @
fd5fd885
...
...
@@ -185,6 +185,7 @@ typedef struct SArguments_S {
bool
insert_only
;
bool
answer_yes
;
bool
debug_print
;
bool
verbose_print
;
char
*
output_file
;
int
mode
;
char
*
datatype
[
MAX_NUM_DATATYPE
+
1
];
...
...
@@ -489,6 +490,7 @@ SArguments g_args = {
false
,
// use_metric
false
,
// insert_only
false
,
// debug_print
false
,
// verbose_print
false
,
// answer_yes;
"./output.txt"
,
// output_file
0
,
// mode : sync or async
...
...
@@ -526,7 +528,11 @@ static SQueryMetaInfo g_queryInfo;
static
FILE
*
g_fpOfInsertResult
=
NULL
;
#define debugPrint(fmt, ...) \
do { if (g_args.debug_print) fprintf(stderr, fmt, __VA_ARGS__); } while(0)
do { if (g_args.debug_print || g_args.verbose_print) \
fprintf(stderr, "DEBG: "fmt, __VA_ARGS__); } while(0)
#define verbosePrint(fmt, ...) \
do { if (g_args.verbose_print) fprintf(stderr, "VERB: "fmt, __VA_ARGS__); } while(0)
///////////////////////////////////////////////////
void
printHelp
()
{
...
...
@@ -691,6 +697,8 @@ void parse_args(int argc, char *argv[], SArguments *arguments) {
arguments
->
answer_yes
=
true
;
}
else
if
(
strcmp
(
argv
[
i
],
"-g"
)
==
0
)
{
arguments
->
debug_print
=
true
;
}
else
if
(
strcmp
(
argv
[
i
],
"-gg"
)
==
0
)
{
arguments
->
verbose_print
=
true
;
}
else
if
(
strcmp
(
argv
[
i
],
"-c"
)
==
0
)
{
strcpy
(
configDir
,
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-O"
)
==
0
)
{
...
...
@@ -748,7 +756,7 @@ void parse_args(int argc, char *argv[], SArguments *arguments) {
printf
(
"
\n
"
);
}
printf
(
"# Insertion interval: %d
\n
"
,
arguments
->
insert_interval
);
printf
(
"# Number of
Columns per record:
%d
\n
"
,
arguments
->
num_of_RPR
);
printf
(
"# Number of
records per req:
%d
\n
"
,
arguments
->
num_of_RPR
);
printf
(
"# Number of Threads: %d
\n
"
,
arguments
->
num_of_threads
);
printf
(
"# Number of Tables: %d
\n
"
,
arguments
->
num_of_tables
);
printf
(
"# Number of Data per Table: %d
\n
"
,
arguments
->
num_of_DPT
);
...
...
@@ -805,7 +813,7 @@ static int queryDbExec(TAOS *taos, char *command, int type) {
}
if
(
code
!=
0
)
{
debugPrint
(
"
DEBUG
%s() LN%d - command: %s
\n
"
,
__func__
,
__LINE__
,
command
);
debugPrint
(
"%s() LN%d - command: %s
\n
"
,
__func__
,
__LINE__
,
command
);
fprintf
(
stderr
,
"Failed to run %s, reason: %s
\n
"
,
command
,
taos_errstr
(
res
));
taos_free_result
(
res
);
//taos_close(taos);
...
...
@@ -1986,7 +1994,7 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls,
exit
(
-
1
);
}
snprintf
(
superTbls
->
colsOfCreateChildTable
,
len
+
20
,
"(ts timestamp%s)"
,
cols
);
debugPrint
(
"DEBUG -
%s() LN%d: %s
\n
"
,
__func__
,
__LINE__
,
superTbls
->
colsOfCreateChildTable
);
verbosePrint
(
"
%s() LN%d: %s
\n
"
,
__func__
,
__LINE__
,
superTbls
->
colsOfCreateChildTable
);
if
(
use_metric
)
{
char
tags
[
STRING_LEN
]
=
"
\0
"
;
...
...
@@ -2039,13 +2047,13 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls,
snprintf
(
command
,
BUFFER_SIZE
,
"create table if not exists %s.%s (ts timestamp%s) tags %s"
,
dbName
,
superTbls
->
sTblName
,
cols
,
tags
);
debugPrint
(
"DEBUG -
%s() LN%d: %s
\n
"
,
__func__
,
__LINE__
,
command
);
verbosePrint
(
"
%s() LN%d: %s
\n
"
,
__func__
,
__LINE__
,
command
);
if
(
0
!=
queryDbExec
(
taos
,
command
,
NO_INSERT_TYPE
))
{
fprintf
(
stderr
,
"create supertable %s failed!
\n\n
"
,
superTbls
->
sTblName
);
return
-
1
;
}
debugPrint
(
"
DEBUG -
create supertable %s success!
\n\n
"
,
superTbls
->
sTblName
);
debugPrint
(
"create supertable %s success!
\n\n
"
,
superTbls
->
sTblName
);
}
return
0
;
}
...
...
@@ -2064,7 +2072,7 @@ static int createDatabases() {
for
(
int
i
=
0
;
i
<
g_Dbs
.
dbCount
;
i
++
)
{
if
(
g_Dbs
.
db
[
i
].
drop
)
{
sprintf
(
command
,
"drop database if exists %s;"
,
g_Dbs
.
db
[
i
].
dbName
);
debugPrint
(
"DEBUG
%s() %d command: %s
\n
"
,
__func__
,
__LINE__
,
command
);
verbosePrint
(
"
%s() %d command: %s
\n
"
,
__func__
,
__LINE__
,
command
);
if
(
0
!=
queryDbExec
(
taos
,
command
,
NO_INSERT_TYPE
))
{
taos_close
(
taos
);
return
-
1
;
...
...
@@ -2132,7 +2140,7 @@ static int createDatabases() {
"precision
\'
%s
\'
;"
,
g_Dbs
.
db
[
i
].
dbCfg
.
precision
);
}
debugPrint
(
"
DEBUG
%s() %d command: %s
\n
"
,
__func__
,
__LINE__
,
command
);
debugPrint
(
"%s() %d command: %s
\n
"
,
__func__
,
__LINE__
,
command
);
if
(
0
!=
queryDbExec
(
taos
,
command
,
NO_INSERT_TYPE
))
{
taos_close
(
taos
);
printf
(
"
\n
create database %s failed!
\n\n
"
,
g_Dbs
.
db
[
i
].
dbName
);
...
...
@@ -2140,11 +2148,11 @@ static int createDatabases() {
}
printf
(
"
\n
create database %s success!
\n\n
"
,
g_Dbs
.
db
[
i
].
dbName
);
debugPrint
(
"
DEBUG
%s() %d supertbl count:%d
\n
"
,
__func__
,
__LINE__
,
g_Dbs
.
db
[
i
].
superTblCount
);
debugPrint
(
"%s() %d supertbl count:%d
\n
"
,
__func__
,
__LINE__
,
g_Dbs
.
db
[
i
].
superTblCount
);
for
(
int
j
=
0
;
j
<
g_Dbs
.
db
[
i
].
superTblCount
;
j
++
)
{
// describe super table, if exists
sprintf
(
command
,
"describe %s.%s;"
,
g_Dbs
.
db
[
i
].
dbName
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
sTblName
);
debugPrint
(
"DEBUG
%s() %d command: %s
\n
"
,
__func__
,
__LINE__
,
command
);
verbosePrint
(
"
%s() %d command: %s
\n
"
,
__func__
,
__LINE__
,
command
);
if
(
0
!=
queryDbExec
(
taos
,
command
,
NO_INSERT_TYPE
))
{
g_Dbs
.
db
[
i
].
superTbls
[
j
].
superTblExists
=
TBL_NO_EXISTS
;
ret
=
createSuperTable
(
taos
,
g_Dbs
.
db
[
i
].
dbName
,
&
g_Dbs
.
db
[
i
].
superTbls
[
j
],
g_Dbs
.
use_metric
);
...
...
@@ -2232,7 +2240,7 @@ static void* createTable(void *sarg)
}
len
=
0
;
debugPrint
(
"DEBUG
%s() LN%d %s
\n
"
,
__func__
,
__LINE__
,
buffer
);
verbosePrint
(
"
%s() LN%d %s
\n
"
,
__func__
,
__LINE__
,
buffer
);
if
(
0
!=
queryDbExec
(
winfo
->
taos
,
buffer
,
NO_INSERT_TYPE
)){
free
(
buffer
);
return
NULL
;
...
...
@@ -2247,7 +2255,7 @@ static void* createTable(void *sarg)
}
if
(
0
!=
len
)
{
debugPrint
(
"DEBUG
%s() %d buffer: %s
\n
"
,
__func__
,
__LINE__
,
buffer
);
verbosePrint
(
"
%s() %d buffer: %s
\n
"
,
__func__
,
__LINE__
,
buffer
);
(
void
)
queryDbExec
(
winfo
->
taos
,
buffer
,
NO_INSERT_TYPE
);
}
...
...
@@ -2285,7 +2293,7 @@ int startMultiThreadCreateChildTable(
t_info
->
threadID
=
i
;
tstrncpy
(
t_info
->
db_name
,
db_name
,
MAX_DB_NAME_SIZE
);
t_info
->
superTblInfo
=
superTblInfo
;
debugPrint
(
"DEBUG
%s() %d db_name: %s
\n
"
,
__func__
,
__LINE__
,
db_name
);
verbosePrint
(
"
%s() %d db_name: %s
\n
"
,
__func__
,
__LINE__
,
db_name
);
t_info
->
taos
=
taos_connect
(
g_Dbs
.
host
,
g_Dbs
.
user
,
...
...
@@ -2336,7 +2344,7 @@ static void createChildTables() {
continue
;
}
debugPrint
(
"DEBUG -
%s() LN%d: %s
\n
"
,
__func__
,
__LINE__
,
verbosePrint
(
"
%s() LN%d: %s
\n
"
,
__func__
,
__LINE__
,
g_Dbs
.
db
[
i
].
superTbls
[
j
].
colsOfCreateChildTable
);
startMultiThreadCreateChildTable
(
g_Dbs
.
db
[
i
].
superTbls
[
j
].
colsOfCreateChildTable
,
...
...
@@ -2362,7 +2370,7 @@ static void createChildTables() {
len
=
snprintf
(
tblColsBuf
+
len
,
MAX_SQL_SIZE
-
len
,
")"
);
debugPrint
(
"DEBUG -
%s() LN%d: dbName: %s num of tb: %d schema: %s
\n
"
,
__func__
,
__LINE__
,
verbosePrint
(
"
%s() LN%d: dbName: %s num of tb: %d schema: %s
\n
"
,
__func__
,
__LINE__
,
g_Dbs
.
db
[
i
].
dbName
,
g_args
.
num_of_tables
,
tblColsBuf
);
startMultiThreadCreateChildTable
(
tblColsBuf
,
...
...
@@ -3224,7 +3232,6 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
goto
PARSE_OVER
;
}
cJSON
*
insertRows
=
cJSON_GetObjectItem
(
stbInfo
,
"insert_rows"
);
if
(
insertRows
&&
insertRows
->
type
==
cJSON_Number
)
{
g_Dbs
.
db
[
i
].
superTbls
[
j
].
insertRows
=
insertRows
->
valueint
;
...
...
@@ -3587,7 +3594,7 @@ PARSE_OVER:
}
static
bool
getInfoFromJsonFile
(
char
*
file
)
{
debugPrint
(
"
DEBUG -
%s %d %s
\n
"
,
__func__
,
__LINE__
,
file
);
debugPrint
(
"%s %d %s
\n
"
,
__func__
,
__LINE__
,
file
);
FILE
*
fp
=
fopen
(
file
,
"r"
);
if
(
!
fp
)
{
...
...
@@ -3746,6 +3753,7 @@ int generateRowData(char* dataBuf, int maxLen, int64_t timestamp, SSuperTable*
return
(
-
1
);
}
}
dataLen
-=
2
;
dataLen
+=
snprintf
(
dataBuf
+
dataLen
,
maxLen
-
dataLen
,
")"
);
...
...
@@ -3776,7 +3784,6 @@ static void syncWriteForNumberOfTblInOneSql(
}
uint64_t
time_counter
=
winfo
->
start_time
;
int64_t
tmp_time
;
int
sampleUsePos
;
int64_t
st
=
0
;
...
...
@@ -3784,6 +3791,7 @@ static void syncWriteForNumberOfTblInOneSql(
for
(
int
i
=
0
;
i
<
superTblInfo
->
insertRows
;)
{
int32_t
tbl_id
=
0
;
for
(
int
tID
=
winfo
->
start_table_id
;
tID
<=
winfo
->
end_table_id
;
)
{
int64_t
tmp_time
=
0
;
int
inserted
=
i
;
for
(
int
k
=
0
;
k
<
g_args
.
num_of_RPR
;)
{
...
...
@@ -3920,7 +3928,7 @@ static void syncWriteForNumberOfTblInOneSql(
send_to_server:
if
(
g_args
.
insert_interval
&&
(
g_args
.
insert_interval
>
(
et
-
st
)))
{
int
sleep_time
=
g_args
.
insert_interval
-
(
et
-
st
);
debugPrint
(
"DEBUG sleep: %d ms
\n
"
,
sleep_time
);
printf
(
"sleep: %d ms specified by insert_interval
\n
"
,
sleep_time
);
taosMsleep
(
sleep_time
);
// ms
}
...
...
@@ -3937,7 +3945,7 @@ send_to_server:
int64_t
endTs
;
startTs
=
taosGetTimestampUs
();
debugPrint
(
"
DEBUG
%s() LN%d buff: %s
\n
"
,
__func__
,
__LINE__
,
buffer
);
debugPrint
(
"%s() LN%d buff: %s
\n
"
,
__func__
,
__LINE__
,
buffer
);
int
affectedRows
=
queryDbExec
(
winfo
->
taos
,
buffer
,
INSERT_TYPE
);
...
...
@@ -3950,16 +3958,16 @@ send_to_server:
if
(
delay
<
winfo
->
minDelay
)
winfo
->
minDelay
=
delay
;
winfo
->
cntDelay
++
;
winfo
->
totalDelay
+=
delay
;
//winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay;
winfo
->
avgDelay
=
(
double
)
winfo
->
totalDelay
/
winfo
->
cntDelay
;
winfo
->
totalAffectedRows
+=
affectedRows
;
}
totalAffectedRows
+=
affectedRows
;
int64_t
currentPrintTime
=
taosGetTimestampMs
();
if
(
currentPrintTime
-
lastPrintTime
>
30
*
1000
)
{
printf
(
"thread[%d] has currently inserted rows: %"
PRId64
", affected rows: %"
PRId64
"
\n
"
,
winfo
->
threadID
,
totalRowsInserted
,
totalAffectedRows
);
winfo
->
totalRowsInserted
,
winfo
->
totalAffectedRows
);
lastPrintTime
=
currentPrintTime
;
}
//int64_t t2 = taosGetTimestampMs();
...
...
@@ -4090,15 +4098,18 @@ static void* syncWrite(void *sarg) {
uint64_t
st
=
0
;
uint64_t
et
=
0
;
for
(
int
i
=
0
;
i
<
g_args
.
num_of_DPT
;)
{
winfo
->
totalRowsInserted
=
0
;
winfo
->
totalAffectedRows
=
0
;
for
(
int
tID
=
winfo
->
start_table_id
;
tID
<=
winfo
->
end_table_id
;
tID
++
)
{
int
inserted
=
i
;
for
(
int
i
=
0
;
i
<
g_args
.
num_of_DPT
;)
{
int
tblInserted
=
i
;
int64_t
tmp_time
=
time_counter
;
char
*
pstr
=
buffer
;
pstr
+=
sprintf
(
pstr
,
"insert into %s.%s%d values"
,
"insert into %s.%s%d values
"
,
winfo
->
db_name
,
g_args
.
tb_prefix
,
tID
);
int
k
;
for
(
k
=
0
;
k
<
g_args
.
num_of_RPR
;)
{
...
...
@@ -4122,13 +4133,15 @@ static void* syncWrite(void *sarg) {
}
pstr
+=
sprintf
(
pstr
,
" %s"
,
data
);
i
nserted
++
;
tblI
nserted
++
;
k
++
;
i
++
;
if
(
i
nserted
>=
g_args
.
num_of_DPT
)
if
(
tblI
nserted
>=
g_args
.
num_of_DPT
)
break
;
}
winfo
->
totalRowsInserted
+=
k
;
/* puts(buffer); */
int64_t
startTs
;
int64_t
endTs
;
...
...
@@ -4137,16 +4150,17 @@ static void* syncWrite(void *sarg) {
if
(
i
>
0
&&
g_args
.
insert_interval
&&
(
g_args
.
insert_interval
>
(
et
-
st
)
))
{
int
sleep_time
=
g_args
.
insert_interval
-
(
et
-
st
);
debugPrint
(
"DEBUG sleep: %d ms
\n
"
,
sleep_time
);
printf
(
"sleep: %d ms specified by insert_interval
\n
"
,
sleep_time
);
taosMsleep
(
sleep_time
);
// ms
}
if
(
g_args
.
insert_interval
)
{
st
=
taosGetTimestampMs
();
}
debugPrint
(
"DEBUG -
%s() LN%d %s
\n
"
,
__func__
,
__LINE__
,
buffer
);
verbosePrint
(
"
%s() LN%d %s
\n
"
,
__func__
,
__LINE__
,
buffer
);
int
affectedRows
=
queryDbExec
(
winfo
->
taos
,
buffer
,
1
);
verbosePrint
(
"%s() LN%d: affectedRows:%d
\n
"
,
__func__
,
__LINE__
,
affectedRows
);
if
(
0
<=
affectedRows
){
endTs
=
taosGetTimestampUs
();
int64_t
delay
=
endTs
-
startTs
;
...
...
@@ -4156,27 +4170,31 @@ static void* syncWrite(void *sarg) {
winfo
->
minDelay
=
delay
;
winfo
->
cntDelay
++
;
winfo
->
totalDelay
+=
delay
;
//winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay;
winfo
->
totalAffectedRows
+=
affectedRows
;
winfo
->
avgDelay
=
(
double
)
winfo
->
totalDelay
/
winfo
->
cntDelay
;
}
verbosePrint
(
"%s() LN%d: totalaffectedRows:%"
PRId64
"
\n
"
,
__func__
,
__LINE__
,
winfo
->
totalAffectedRows
);
if
(
g_args
.
insert_interval
)
{
et
=
taosGetTimestampMs
();
}
if
(
tID
==
winfo
->
end_table_id
)
{
i
=
inserted
;
time_counter
=
tmp_time
;
}
if
(
tblInserted
>=
g_args
.
num_of_DPT
)
{
break
;
}
}
// num_of_DPT
}
// tId
printf
(
"====thread[%d] completed total inserted rows: %"
PRId64
", total affected rows: %"
PRId64
"====
\n
"
,
winfo
->
threadID
,
winfo
->
totalRowsInserted
,
winfo
->
totalAffectedRows
);
}
return
NULL
;
}
static
void
*
syncWriteWithStb
(
void
*
sarg
)
{
uint64_t
totalRowsInserted
=
0
;
uint64_t
totalAffectedRows
=
0
;
uint64_t
lastPrintTime
=
taosGetTimestampMs
();
threadInfo
*
winfo
=
(
threadInfo
*
)
sarg
;
...
...
@@ -4232,24 +4250,41 @@ static void* syncWriteWithStb(void *sarg) {
return
NULL
;
}
int64_t
time_counter
=
winfo
->
start_time
;
uint64_t
st
=
0
;
uint64_t
et
=
0
;
debugPrint
(
"DEBUG - %s() LN%d insertRows=%"
PRId64
"
\n
"
,
__func__
,
__LINE__
,
superTblInfo
->
insertRows
);
winfo
->
totalRowsInserted
=
0
;
winfo
->
totalAffectedRows
=
0
;
int
sampleUsePos
;
debugPrint
(
"%s() LN%d insertRows=%"
PRId64
"
\n
"
,
__func__
,
__LINE__
,
superTblInfo
->
insertRows
);
for
(
uint32_t
tID
=
winfo
->
start_table_id
;
tID
<=
winfo
->
end_table_id
;
tID
++
)
{
int64_t
start_time
=
winfo
->
start_time
;
for
(
int
i
=
0
;
i
<
superTblInfo
->
insertRows
;)
{
for
(
uint32_t
tID
=
winfo
->
start_table_id
;
tID
<=
winfo
->
end_table_id
;
tID
++
)
{
uint64_t
inserted
=
i
;
uint64_t
tmp_time
=
time_counter
;
int64_t
tblInserted
=
i
;
if
(
i
>
0
&&
g_args
.
insert_interval
&&
(
g_args
.
insert_interval
>
(
et
-
st
)
))
{
int
sleep_time
=
g_args
.
insert_interval
-
(
et
-
st
);
printf
(
"sleep: %d ms specified by insert_interval
\n
"
,
sleep_time
);
taosMsleep
(
sleep_time
);
// ms
}
if
(
g_args
.
insert_interval
)
{
st
=
taosGetTimestampMs
();
}
sampleUsePos
=
samplePos
;
verbosePrint
(
"%s() LN%d num_of_RPR=%d
\n
"
,
__func__
,
__LINE__
,
g_args
.
num_of_RPR
);
int
sampleUsePos
=
samplePos
;
int
k
=
0
;
debugPrint
(
"DEBUG - %s() LN%d num_of_RPR=%d
\n
"
,
__func__
,
__LINE__
,
g_args
.
num_of_RPR
);
for
(
k
=
0
;
k
<
g_args
.
num_of_RPR
;)
{
int
len
=
0
;
memset
(
buffer
,
0
,
superTblInfo
->
maxSqlLen
);
int
len
=
0
;
char
*
pstr
=
buffer
;
if
(
AUTO_CREATE_SUBTBL
==
superTblInfo
->
autoCreateTable
)
{
...
...
@@ -4290,12 +4325,14 @@ static void* syncWriteWithStb(void *sarg) {
tID
);
}
int
k
;
for
(
k
=
0
;
k
<
g_args
.
num_of_RPR
;)
{
int
retLen
=
0
;
if
(
0
==
strncasecmp
(
superTblInfo
->
dataSource
,
"sample"
,
strlen
(
"sample"
)))
{
retLen
=
getRowDataFromSample
(
pstr
+
len
,
superTblInfo
->
maxSqlLen
-
len
,
tmp_time
+=
superTblInfo
->
timeStampStep
,
start_time
+
superTblInfo
->
timeStampStep
*
i
,
superTblInfo
,
&
sampleUsePos
,
fp
,
...
...
@@ -4307,54 +4344,44 @@ static void* syncWriteWithStb(void *sarg) {
int
rand_num
=
rand_tinyint
()
%
100
;
if
(
0
!=
superTblInfo
->
disorderRatio
&&
rand_num
<
superTblInfo
->
disorderRatio
)
{
int64_t
d
=
tmp
_time
-
rand
()
%
superTblInfo
->
disorderRange
;
int64_t
d
=
start
_time
-
rand
()
%
superTblInfo
->
disorderRange
;
retLen
=
generateRowData
(
pstr
+
len
,
superTblInfo
->
maxSqlLen
-
len
,
d
,
superTblInfo
->
maxSqlLen
-
len
,
d
,
superTblInfo
);
//printf("disorder rows, rand_num:%d, last ts:%"PRId64" current ts:%"PRId64"\n", rand_num, tmp_time, d);
}
else
{
retLen
=
generateRowData
(
pstr
+
len
,
superTblInfo
->
maxSqlLen
-
len
,
tmp_time
+=
superTblInfo
->
timeStampStep
,
start_time
+
superTblInfo
->
timeStampStep
*
i
,
superTblInfo
);
}
if
(
retLen
<
0
)
{
goto
free_and_statistics_2
;
}
}
/* len += retLen;
*/
inserted
++
;
len
+=
retLen
;
verbosePrint
(
"%s() LN%d retLen=%d len=%d k=%d buffer=%s
\n
"
,
__func__
,
__LINE__
,
retLen
,
len
,
k
,
buffer
);
tblInserted
++
;
k
++
;
totalRowsInserted
++
;
i
++
;
if
(
inserted
>
superTblInfo
->
insertRows
)
break
;
/* if (inserted >= superTblInfo->insertRows
|| (superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128))
if
(
tblInserted
>=
superTblInfo
->
insertRows
)
break
;
*/
if
(
i
>
0
&&
g_args
.
insert_interval
&&
(
g_args
.
insert_interval
>
(
et
-
st
)
))
{
int
sleep_time
=
g_args
.
insert_interval
-
(
et
-
st
);
debugPrint
(
"DEBUG sleep: %d ms
\n
"
,
sleep_time
);
taosMsleep
(
sleep_time
);
// ms
}
if
(
g_args
.
insert_interval
)
{
st
=
taosGetTimestampMs
();
}
winfo
->
totalRowsInserted
+=
k
;
if
(
0
==
strncasecmp
(
superTblInfo
->
insertMode
,
"taosc"
,
strlen
(
"taosc"
)))
{
//printf("===== sql: %s \n\n", buffer);
//int64_t t1 = taosGetTimestampMs();
int64_t
startTs
;
int64_t
endTs
;
startTs
=
taosGetTimestampUs
();
debugPrint
(
"DEBUG
%s() LN%d %s
\n
"
,
__func__
,
__LINE__
,
buffer
);
verbosePrint
(
"
%s() LN%d %s
\n
"
,
__func__
,
__LINE__
,
buffer
);
int
affectedRows
=
queryDbExec
(
winfo
->
taos
,
buffer
,
INSERT_TYPE
);
if
(
0
>
affectedRows
){
...
...
@@ -4366,25 +4393,19 @@ static void* syncWriteWithStb(void *sarg) {
if
(
delay
<
winfo
->
minDelay
)
winfo
->
minDelay
=
delay
;
winfo
->
cntDelay
++
;
winfo
->
totalDelay
+=
delay
;
//winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay;
}
totalAffectedRows
+=
affectedRows
;
winfo
->
totalAffectedRows
+=
affectedRows
;
int64_t
currentPrintTime
=
taosGetTimestampMs
();
if
(
currentPrintTime
-
lastPrintTime
>
30
*
1000
)
{
printf
(
"thread[%d] has currently inserted rows: %"
PRId64
", affected rows: %"
PRId64
"
\n
"
,
winfo
->
threadID
,
totalRowsInserted
,
totalAffectedRows
);
winfo
->
totalRowsInserted
,
winfo
->
totalAffectedRows
);
lastPrintTime
=
currentPrintTime
;
}
//int64_t t2 = taosGetTimestampMs();
//printf("taosc insert sql return, Spent %.4f seconds \n", (double)(t2 - t1)/1000.0);
}
else
{
//int64_t t1 = taosGetTimestampMs();
int
retCode
=
postProceSql
(
g_Dbs
.
host
,
g_Dbs
.
port
,
buffer
);
//int64_t t2 = taosGetTimestampMs();
//printf("http insert sql return, Spent %ld ms \n", t2 - t1);
if
(
0
!=
retCode
)
{
printf
(
"========restful return fail, threadID[%d]
\n
"
,
winfo
->
threadID
);
...
...
@@ -4394,21 +4415,10 @@ static void* syncWriteWithStb(void *sarg) {
if
(
g_args
.
insert_interval
)
{
et
=
taosGetTimestampMs
();
}
/*
if (loop_cnt) {
loop_cnt--;
if ((1 == loop_cnt) && (0 != nrecords_last_req)) {
nrecords_cur_req = nrecords_last_req;
} else if (0 == loop_cnt){
nrecords_cur_req = nrecords_no_last_req;
loop_cnt = loop_cnt_orig;
break;
}
} else {
if
(
tblInserted
>=
superTblInfo
->
insertRows
)
break
;
}
*/
}
}
// num_of_DPT
if
(
tID
==
winfo
->
end_table_id
)
{
if
(
0
==
strncasecmp
(
...
...
@@ -4416,26 +4426,19 @@ static void* syncWriteWithStb(void *sarg) {
samplePos
=
sampleUsePos
;
}
i
=
inserted
;
time_counter
=
tmp_time
;
}
}
//printf("========loop %d childTables duration:%"PRId64 "========inserted rows:%d\n", winfo->end_table_id - winfo->start_table_id, et - st, i);
}
}
// tID
free_and_statistics_2:
tmfree
(
buffer
);
tmfree
(
sampleDataBuf
);
tmfclose
(
fp
);
winfo
->
totalRowsInserted
=
totalRowsInserted
;
winfo
->
totalAffectedRows
=
totalAffectedRows
;
printf
(
"====thread[%d] completed total inserted rows: %"
PRId64
", total affected rows: %"
PRId64
"====
\n
"
,
winfo
->
threadID
,
totalRowsInserted
,
totalAffectedRows
);
winfo
->
totalRowsInserted
,
winfo
->
totalAffectedRows
);
return
NULL
;
}
...
...
@@ -4453,7 +4456,8 @@ void callBack(void *param, TAOS_RES *res, int code) {
char
*
data
=
calloc
(
1
,
MAX_DATA_SIZE
);
char
*
pstr
=
buffer
;
pstr
+=
sprintf
(
pstr
,
"insert into %s.%s%d values"
,
winfo
->
db_name
,
winfo
->
tb_prefix
,
winfo
->
start_table_id
);
if
(
winfo
->
counter
>=
winfo
->
superTblInfo
->
insertRows
)
{
// if (winfo->counter >= winfo->superTblInfo->insertRows) {
if
(
winfo
->
counter
>=
g_args
.
num_of_RPR
)
{
winfo
->
start_table_id
++
;
winfo
->
counter
=
0
;
}
...
...
@@ -4644,13 +4648,13 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision,
if
(
superTblInfo
)
{
superTblInfo
->
totalAffectedRows
+=
t_info
->
totalAffectedRows
;
superTblInfo
->
totalRowsInserted
+=
t_info
->
totalRowsInserted
;
}
totalDelay
+=
t_info
->
totalDelay
;
cntDelay
+=
t_info
->
cntDelay
;
if
(
t_info
->
maxDelay
>
maxDelay
)
maxDelay
=
t_info
->
maxDelay
;
if
(
t_info
->
minDelay
<
minDelay
)
minDelay
=
t_info
->
minDelay
;
}
}
cntDelay
-=
1
;
if
(
cntDelay
==
0
)
cntDelay
=
1
;
...
...
@@ -4698,11 +4702,12 @@ void *readTable(void *sarg) {
}
int
num_of_DPT
;
if
(
rinfo
->
superTblInfo
)
{
/*
if (rinfo->superTblInfo) {
num_of_DPT = rinfo->superTblInfo->insertRows; // nrecords_per_table;
} else {
*/
num_of_DPT
=
g_args
.
num_of_DPT
;
}
//
}
int
num_of_tables
=
rinfo
->
end_table_id
-
rinfo
->
start_table_id
+
1
;
int
totalData
=
num_of_DPT
*
num_of_tables
;
...
...
@@ -4836,7 +4841,7 @@ int insertTestProcess() {
if
(
ret
==
-
1
)
exit
(
EXIT_FAILURE
);
debugPrint
(
"
DEBUG -
%d result file: %s
\n
"
,
__LINE__
,
g_Dbs
.
resultFile
);
debugPrint
(
"%d result file: %s
\n
"
,
__LINE__
,
g_Dbs
.
resultFile
);
g_fpOfInsertResult
=
fopen
(
g_Dbs
.
resultFile
,
"a"
);
if
(
NULL
==
g_fpOfInsertResult
)
{
fprintf
(
stderr
,
"Failed to open %s for save result
\n
"
,
g_Dbs
.
resultFile
);
...
...
@@ -5072,7 +5077,7 @@ static int queryTestProcess() {
char
sqlStr
[
MAX_TB_NAME_SIZE
*
2
];
sprintf
(
sqlStr
,
"use %s"
,
g_queryInfo
.
dbName
);
debugPrint
(
"DEBUG
%s() %d sqlStr: %s
\n
"
,
__func__
,
__LINE__
,
sqlStr
);
verbosePrint
(
"
%s() %d sqlStr: %s
\n
"
,
__func__
,
__LINE__
,
sqlStr
);
(
void
)
queryDbExec
(
t_info
->
taos
,
sqlStr
,
NO_INSERT_TYPE
);
}
else
{
t_info
->
taos
=
NULL
;
...
...
@@ -5183,7 +5188,7 @@ void *subSubscribeProcess(void *sarg) {
char
sqlStr
[
MAX_TB_NAME_SIZE
*
2
];
sprintf
(
sqlStr
,
"use %s"
,
g_queryInfo
.
dbName
);
debugPrint
(
"DEBUG
%s() %d sqlStr: %s
\n
"
,
__func__
,
__LINE__
,
sqlStr
);
debugPrint
(
"
%s() %d sqlStr: %s
\n
"
,
__func__
,
__LINE__
,
sqlStr
);
if
(
0
!=
queryDbExec
(
winfo
->
taos
,
sqlStr
,
NO_INSERT_TYPE
)){
return
NULL
;
}
...
...
@@ -5249,7 +5254,7 @@ void *superSubscribeProcess(void *sarg) {
char
sqlStr
[
MAX_TB_NAME_SIZE
*
2
];
sprintf
(
sqlStr
,
"use %s"
,
g_queryInfo
.
dbName
);
debugPrint
(
"DEBUG
%s() %d sqlStr: %s
\n
"
,
__func__
,
__LINE__
,
sqlStr
);
debugPrint
(
"
%s() %d sqlStr: %s
\n
"
,
__func__
,
__LINE__
,
sqlStr
);
if
(
0
!=
queryDbExec
(
winfo
->
taos
,
sqlStr
,
NO_INSERT_TYPE
))
{
return
NULL
;
}
...
...
@@ -5614,7 +5619,8 @@ void querySqlFile(TAOS* taos, char* sqlFile)
}
memcpy
(
cmd
+
cmd_len
,
line
,
read_len
);
debugPrint
(
"DEBUG %s() LN%d cmd: %s
\n
"
,
__func__
,
__LINE__
,
cmd
);
verbosePrint
(
"%s() LN%d cmd: %s
\n
"
,
__func__
,
__LINE__
,
cmd
);
queryDbExec
(
taos
,
cmd
,
NO_INSERT_TYPE
);
if
(
0
!=
queryDbExec
(
taos
,
cmd
,
NO_INSERT_TYPE
))
{
printf
(
"queryDbExec %s failed!
\n
"
,
cmd
);
tmfree
(
cmd
);
...
...
@@ -5708,7 +5714,7 @@ static void testCmdLine() {
int
main
(
int
argc
,
char
*
argv
[])
{
parse_args
(
argc
,
argv
,
&
g_args
);
debugPrint
(
"
DEBUG -
meta file: %s
\n
"
,
g_args
.
metaFile
);
debugPrint
(
"meta file: %s
\n
"
,
g_args
.
metaFile
);
if
(
g_args
.
metaFile
)
{
initOfInsertMeta
();
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录