Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
f033362c
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
f033362c
编写于
9月 09, 2021
作者:
sangshuduo
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
split func for stmt interlace.
上级
791802f4
变更
2
显示空白变更内容
内联
并排
Showing
2 changed file
with
526 addition
and
88 deletion
+526
-88
src/kit/taosdemo/taosdemo.c
src/kit/taosdemo/taosdemo.c
+525
-87
tests/pytest/tools/taosdemoAllTest/insertInterlaceRowsLarge1M.json
...est/tools/taosdemoAllTest/insertInterlaceRowsLarge1M.json
+1
-1
未找到文件。
src/kit/taosdemo/taosdemo.c
浏览文件 @
f033362c
...
@@ -244,7 +244,7 @@ typedef struct SArguments_S {
...
@@ -244,7 +244,7 @@ typedef struct SArguments_S {
uint64_t
insert_interval
;
uint64_t
insert_interval
;
uint64_t
timestamp_step
;
uint64_t
timestamp_step
;
int64_t
query_times
;
int64_t
query_times
;
uint32_t
interlace
_r
ows
;
uint32_t
interlace
R
ows
;
uint32_t
reqPerReq
;
// num_of_records_per_req
uint32_t
reqPerReq
;
// num_of_records_per_req
uint64_t
max_sql_len
;
uint64_t
max_sql_len
;
int64_t
ntables
;
int64_t
ntables
;
...
@@ -451,14 +451,13 @@ typedef struct SQueryMetaInfo_S {
...
@@ -451,14 +451,13 @@ typedef struct SQueryMetaInfo_S {
typedef
struct
SThreadInfo_S
{
typedef
struct
SThreadInfo_S
{
TAOS
*
taos
;
TAOS
*
taos
;
TAOS_STMT
*
stmt
;
TAOS_STMT
*
stmt
;
int64_t
*
bind_ts
;
#if STMT_BIND_PARAM_BATCH == 1
#if STMT_BIND_PARAM_BATCH == 1
int64_t
*
bind_ts
;
int64_t
*
bind_ts_array
;
int64_t
*
bind_ts_array
;
char
*
bindParams
;
char
*
bindParams
;
char
*
is_null
;
char
*
is_null
;
#else
#else
int64_t
*
bind_ts
;
char
*
sampleBindArray
;
char
*
sampleBindArray
;
#endif
#endif
...
@@ -607,8 +606,8 @@ char *g_rand_current_buff = NULL;
...
@@ -607,8 +606,8 @@ char *g_rand_current_buff = NULL;
char
*
g_rand_phase_buff
=
NULL
;
char
*
g_rand_phase_buff
=
NULL
;
char
*
g_randdouble_buff
=
NULL
;
char
*
g_randdouble_buff
=
NULL
;
char
*
g_aggreFunc
[]
=
{
"*"
,
"count(*)"
,
"avg(
col0)"
,
"sum(col
0)"
,
char
*
g_aggreFunc
[]
=
{
"*"
,
"count(*)"
,
"avg(
C0)"
,
"sum(C
0)"
,
"max(
col0)"
,
"min(col0)"
,
"first(col0)"
,
"last(col
0)"
};
"max(
C0)"
,
"min(C0)"
,
"first(C0)"
,
"last(C
0)"
};
SArguments
g_args
=
{
SArguments
g_args
=
{
NULL
,
// metaFile
NULL
,
// metaFile
...
@@ -652,7 +651,7 @@ SArguments g_args = {
...
@@ -652,7 +651,7 @@ SArguments g_args = {
0
,
// insert_interval
0
,
// insert_interval
DEFAULT_TIMESTAMP_STEP
,
// timestamp_step
DEFAULT_TIMESTAMP_STEP
,
// timestamp_step
1
,
// query_times
1
,
// query_times
DEFAULT_INTERLACE_ROWS
,
// interlace
_r
ows;
DEFAULT_INTERLACE_ROWS
,
// interlace
R
ows;
30000
,
// reqPerReq
30000
,
// reqPerReq
(
1024
*
1024
),
// max_sql_len
(
1024
*
1024
),
// max_sql_len
DEFAULT_CHILDTABLES
,
// ntables
DEFAULT_CHILDTABLES
,
// ntables
...
@@ -1310,17 +1309,17 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
...
@@ -1310,17 +1309,17 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
errorPrintReqArg2
(
argv
[
0
],
"B"
);
errorPrintReqArg2
(
argv
[
0
],
"B"
);
exit
(
EXIT_FAILURE
);
exit
(
EXIT_FAILURE
);
}
}
arguments
->
interlace
_r
ows
=
atoi
(
argv
[
++
i
]);
arguments
->
interlace
R
ows
=
atoi
(
argv
[
++
i
]);
}
else
if
(
0
==
strncmp
(
argv
[
i
],
"--interlace-rows="
,
strlen
(
"--interlace-rows="
)))
{
}
else
if
(
0
==
strncmp
(
argv
[
i
],
"--interlace-rows="
,
strlen
(
"--interlace-rows="
)))
{
if
(
isStringNumber
((
char
*
)(
argv
[
i
]
+
strlen
(
"--interlace-rows="
))))
{
if
(
isStringNumber
((
char
*
)(
argv
[
i
]
+
strlen
(
"--interlace-rows="
))))
{
arguments
->
interlace
_r
ows
=
atoi
((
char
*
)(
argv
[
i
]
+
strlen
(
"--interlace-rows="
)));
arguments
->
interlace
R
ows
=
atoi
((
char
*
)(
argv
[
i
]
+
strlen
(
"--interlace-rows="
)));
}
else
{
}
else
{
errorPrintReqArg2
(
argv
[
0
],
"--interlace-rows"
);
errorPrintReqArg2
(
argv
[
0
],
"--interlace-rows"
);
exit
(
EXIT_FAILURE
);
exit
(
EXIT_FAILURE
);
}
}
}
else
if
(
0
==
strncmp
(
argv
[
i
],
"-B"
,
strlen
(
"-B"
)))
{
}
else
if
(
0
==
strncmp
(
argv
[
i
],
"-B"
,
strlen
(
"-B"
)))
{
if
(
isStringNumber
((
char
*
)(
argv
[
i
]
+
strlen
(
"-B"
))))
{
if
(
isStringNumber
((
char
*
)(
argv
[
i
]
+
strlen
(
"-B"
))))
{
arguments
->
interlace
_r
ows
=
atoi
((
char
*
)(
argv
[
i
]
+
strlen
(
"-B"
)));
arguments
->
interlace
R
ows
=
atoi
((
char
*
)(
argv
[
i
]
+
strlen
(
"-B"
)));
}
else
{
}
else
{
errorPrintReqArg2
(
argv
[
0
],
"-B"
);
errorPrintReqArg2
(
argv
[
0
],
"-B"
);
exit
(
EXIT_FAILURE
);
exit
(
EXIT_FAILURE
);
...
@@ -1333,7 +1332,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
...
@@ -1333,7 +1332,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
errorPrintReqArg2
(
argv
[
0
],
"--interlace-rows"
);
errorPrintReqArg2
(
argv
[
0
],
"--interlace-rows"
);
exit
(
EXIT_FAILURE
);
exit
(
EXIT_FAILURE
);
}
}
arguments
->
interlace
_r
ows
=
atoi
(
argv
[
++
i
]);
arguments
->
interlace
R
ows
=
atoi
(
argv
[
++
i
]);
}
else
{
}
else
{
errorUnrecognized
(
argv
[
0
],
argv
[
i
]);
errorUnrecognized
(
argv
[
0
],
argv
[
i
]);
exit
(
EXIT_FAILURE
);
exit
(
EXIT_FAILURE
);
...
@@ -4859,15 +4858,15 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
...
@@ -4859,15 +4858,15 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
cJSON
*
interlaceRows
=
cJSON_GetObjectItem
(
root
,
"interlace_rows"
);
cJSON
*
interlaceRows
=
cJSON_GetObjectItem
(
root
,
"interlace_rows"
);
if
(
interlaceRows
&&
interlaceRows
->
type
==
cJSON_Number
)
{
if
(
interlaceRows
&&
interlaceRows
->
type
==
cJSON_Number
)
{
if
(
interlaceRows
->
valueint
<
0
)
{
if
(
interlaceRows
->
valueint
<
0
)
{
errorPrint
(
"%s"
,
"failed to read json, interlace
_r
ows input mistake
\n
"
);
errorPrint
(
"%s"
,
"failed to read json, interlace
R
ows input mistake
\n
"
);
goto
PARSE_OVER
;
goto
PARSE_OVER
;
}
}
g_args
.
interlace
_r
ows
=
interlaceRows
->
valueint
;
g_args
.
interlace
R
ows
=
interlaceRows
->
valueint
;
}
else
if
(
!
interlaceRows
)
{
}
else
if
(
!
interlaceRows
)
{
g_args
.
interlace
_r
ows
=
0
;
// 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req
g_args
.
interlace
R
ows
=
0
;
// 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req
}
else
{
}
else
{
errorPrint
(
"%s"
,
"failed to read json, interlace
_r
ows input mistake
\n
"
);
errorPrint
(
"%s"
,
"failed to read json, interlace
R
ows input mistake
\n
"
);
goto
PARSE_OVER
;
goto
PARSE_OVER
;
}
}
...
@@ -4929,13 +4928,13 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
...
@@ -4929,13 +4928,13 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
}
}
// rows per table need be less than insert batch
// rows per table need be less than insert batch
if
(
g_args
.
interlace
_r
ows
>
g_args
.
reqPerReq
)
{
if
(
g_args
.
interlace
R
ows
>
g_args
.
reqPerReq
)
{
printf
(
"NOTICE: interlace rows value %u > num_of_records_per_req %u
\n\n
"
,
printf
(
"NOTICE: interlace rows value %u > num_of_records_per_req %u
\n\n
"
,
g_args
.
interlace
_r
ows
,
g_args
.
reqPerReq
);
g_args
.
interlace
R
ows
,
g_args
.
reqPerReq
);
printf
(
" interlace rows value will be set to num_of_records_per_req %u
\n\n
"
,
printf
(
" interlace rows value will be set to num_of_records_per_req %u
\n\n
"
,
g_args
.
reqPerReq
);
g_args
.
reqPerReq
);
prompt
();
prompt
();
g_args
.
interlace
_r
ows
=
g_args
.
reqPerReq
;
g_args
.
interlace
R
ows
=
g_args
.
reqPerReq
;
}
}
cJSON
*
dbs
=
cJSON_GetObjectItem
(
root
,
"databases"
);
cJSON
*
dbs
=
cJSON_GetObjectItem
(
root
,
"databases"
);
...
@@ -8462,13 +8461,13 @@ static void printStatPerThread(threadInfo *pThreadInfo)
...
@@ -8462,13 +8461,13 @@ static void printStatPerThread(threadInfo *pThreadInfo)
);
);
}
}
// sync write interlace data
#if STMT_BIND_PARAM_BATCH == 1
static
void
*
syncWriteInterlace
(
threadInfo
*
pThreadInfo
)
{
// stmt sync write interlace data
debugPrint
(
"[%d] %s() LN%d: ### interlace write
\n
"
,
static
void
*
syncWriteInterlaceStmtBatch
(
threadInfo
*
pThreadInfo
,
uint32_t
interlaceRows
)
{
debugPrint
(
"[%d] %s() LN%d: ### stmt interlace write
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
);
pThreadInfo
->
threadID
,
__func__
,
__LINE__
);
int64_t
insertRows
;
int64_t
insertRows
;
uint32_t
interlaceRows
;
uint64_t
maxSqlLen
;
uint64_t
maxSqlLen
;
int64_t
nTimeStampStep
;
int64_t
nTimeStampStep
;
uint64_t
insert_interval
;
uint64_t
insert_interval
;
...
@@ -8477,19 +8476,235 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
...
@@ -8477,19 +8476,235 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
if
(
stbInfo
)
{
if
(
stbInfo
)
{
insertRows
=
stbInfo
->
insertRows
;
insertRows
=
stbInfo
->
insertRows
;
maxSqlLen
=
stbInfo
->
maxSqlLen
;
nTimeStampStep
=
stbInfo
->
timeStampStep
;
insert_interval
=
stbInfo
->
insertInterval
;
}
else
{
insertRows
=
g_args
.
insertRows
;
maxSqlLen
=
g_args
.
max_sql_len
;
nTimeStampStep
=
g_args
.
timestamp_step
;
insert_interval
=
g_args
.
insert_interval
;
}
if
((
stbInfo
->
interlaceRows
==
0
)
debugPrint
(
"[%d] %s() LN%d: start_table_from=%"
PRIu64
" ntables=%"
PRId64
" insertRows=%"
PRIu64
"
\n
"
,
&&
(
g_args
.
interlace_rows
>
0
))
{
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
interlaceRows
=
g_args
.
interlace_rows
;
pThreadInfo
->
start_table_from
,
pThreadInfo
->
ntables
,
insertRows
);
uint32_t
batchPerTbl
=
interlaceRows
;
uint32_t
batchPerTblTimes
;
if
(
interlaceRows
>
g_args
.
reqPerReq
)
interlaceRows
=
g_args
.
reqPerReq
;
if
((
interlaceRows
>
0
)
&&
(
pThreadInfo
->
ntables
>
1
))
{
batchPerTblTimes
=
g_args
.
reqPerReq
/
interlaceRows
;
}
else
{
}
else
{
interlaceRows
=
stbInfo
->
interlaceRows
;
batchPerTblTimes
=
1
;
}
pThreadInfo
->
totalInsertRows
=
0
;
pThreadInfo
->
totalAffectedRows
=
0
;
uint64_t
st
=
0
;
uint64_t
et
=
UINT64_MAX
;
uint64_t
lastPrintTime
=
taosGetTimestampMs
();
uint64_t
startTs
=
taosGetTimestampMs
();
uint64_t
endTs
;
uint64_t
tableSeq
=
pThreadInfo
->
start_table_from
;
int64_t
startTime
=
pThreadInfo
->
start_time
;
uint64_t
generatedRecPerTbl
=
0
;
bool
flagSleep
=
true
;
uint64_t
sleepTimeTotal
=
0
;
int
percentComplete
=
0
;
int64_t
totalRows
=
insertRows
*
pThreadInfo
->
ntables
;
while
(
pThreadInfo
->
totalInsertRows
<
pThreadInfo
->
ntables
*
insertRows
)
{
if
((
flagSleep
)
&&
(
insert_interval
))
{
st
=
taosGetTimestampMs
();
flagSleep
=
false
;
}
uint32_t
recOfBatch
=
0
;
int32_t
generated
;
for
(
uint64_t
i
=
0
;
i
<
batchPerTblTimes
;
i
++
)
{
char
tableName
[
TSDB_TABLE_NAME_LEN
];
getTableName
(
tableName
,
pThreadInfo
,
tableSeq
);
if
(
0
==
strlen
(
tableName
))
{
errorPrint2
(
"[%d] %s() LN%d, getTableName return null
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
);
return
NULL
;
}
if
(
stbInfo
)
{
generated
=
prepareStbStmtWithSample
(
pThreadInfo
,
tableName
,
tableSeq
,
batchPerTbl
,
insertRows
,
0
,
startTime
,
&
(
pThreadInfo
->
samplePos
));
}
else
{
debugPrint
(
"[%d] %s() LN%d, tableName:%s, batch:%d startTime:%"
PRId64
"
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
tableName
,
batchPerTbl
,
startTime
);
generated
=
prepareStmtWithoutStb
(
pThreadInfo
,
tableName
,
batchPerTbl
,
insertRows
,
i
,
startTime
);
}
debugPrint
(
"[%d] %s() LN%d, generated records is %d
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
generated
);
if
(
generated
<
0
)
{
errorPrint2
(
"[%d] %s() LN%d, generated records is %d
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
generated
);
goto
free_of_interlace_stmt
;
}
else
if
(
generated
==
0
)
{
break
;
}
}
tableSeq
++
;
recOfBatch
+=
batchPerTbl
;
pThreadInfo
->
totalInsertRows
+=
batchPerTbl
;
verbosePrint
(
"[%d] %s() LN%d batchPerTbl=%d recOfBatch=%d
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
batchPerTbl
,
recOfBatch
);
if
(
tableSeq
==
pThreadInfo
->
start_table_from
+
pThreadInfo
->
ntables
)
{
// turn to first table
tableSeq
=
pThreadInfo
->
start_table_from
;
generatedRecPerTbl
+=
batchPerTbl
;
startTime
=
pThreadInfo
->
start_time
+
generatedRecPerTbl
*
nTimeStampStep
;
flagSleep
=
true
;
if
(
generatedRecPerTbl
>=
insertRows
)
break
;
int64_t
remainRows
=
insertRows
-
generatedRecPerTbl
;
if
((
remainRows
>
0
)
&&
(
batchPerTbl
>
remainRows
))
batchPerTbl
=
remainRows
;
if
(
pThreadInfo
->
ntables
*
batchPerTbl
<
g_args
.
reqPerReq
)
break
;
}
verbosePrint
(
"[%d] %s() LN%d generatedRecPerTbl=%"
PRId64
" insertRows=%"
PRId64
"
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
generatedRecPerTbl
,
insertRows
);
if
((
g_args
.
reqPerReq
-
recOfBatch
)
<
batchPerTbl
)
break
;
}
verbosePrint
(
"[%d] %s() LN%d recOfBatch=%d totalInsertRows=%"
PRIu64
"
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
recOfBatch
,
pThreadInfo
->
totalInsertRows
);
startTs
=
taosGetTimestampUs
();
if
(
recOfBatch
==
0
)
{
errorPrint2
(
"[%d] %s() LN%d Failed to insert records of batch %d
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
batchPerTbl
);
if
(
batchPerTbl
>
0
)
{
errorPrint
(
"
\t
If the batch is %d, the length of the SQL to insert a row must be less then %"
PRId64
"
\n
"
,
batchPerTbl
,
maxSqlLen
/
batchPerTbl
);
}
goto
free_of_interlace_stmt
;
}
int64_t
affectedRows
=
execInsert
(
pThreadInfo
,
recOfBatch
);
endTs
=
taosGetTimestampUs
();
uint64_t
delay
=
endTs
-
startTs
;
performancePrint
(
"%s() LN%d, insert execution time is %10.2f ms
\n
"
,
__func__
,
__LINE__
,
delay
/
1000
.
0
);
verbosePrint
(
"[%d] %s() LN%d affectedRows=%"
PRId64
"
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
affectedRows
);
if
(
delay
>
pThreadInfo
->
maxDelay
)
pThreadInfo
->
maxDelay
=
delay
;
if
(
delay
<
pThreadInfo
->
minDelay
)
pThreadInfo
->
minDelay
=
delay
;
pThreadInfo
->
cntDelay
++
;
pThreadInfo
->
totalDelay
+=
delay
;
if
(
recOfBatch
!=
affectedRows
)
{
errorPrint2
(
"[%d] %s() LN%d execInsert insert %d, affected rows: %"
PRId64
"
\n\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
recOfBatch
,
affectedRows
);
goto
free_of_interlace_stmt
;
}
pThreadInfo
->
totalAffectedRows
+=
affectedRows
;
int
currentPercent
=
pThreadInfo
->
totalAffectedRows
*
100
/
totalRows
;
if
(
currentPercent
>
percentComplete
)
{
printf
(
"[%d]:%d%%
\n
"
,
pThreadInfo
->
threadID
,
currentPercent
);
percentComplete
=
currentPercent
;
}
int64_t
currentPrintTime
=
taosGetTimestampMs
();
if
(
currentPrintTime
-
lastPrintTime
>
30
*
1000
)
{
printf
(
"thread[%d] has currently inserted rows: %"
PRIu64
", affected rows: %"
PRIu64
"
\n
"
,
pThreadInfo
->
threadID
,
pThreadInfo
->
totalInsertRows
,
pThreadInfo
->
totalAffectedRows
);
lastPrintTime
=
currentPrintTime
;
}
if
((
insert_interval
)
&&
flagSleep
)
{
et
=
taosGetTimestampMs
();
if
(
insert_interval
>
(
et
-
st
)
)
{
uint64_t
sleepTime
=
insert_interval
-
(
et
-
st
);
performancePrint
(
"%s() LN%d sleep: %"
PRId64
" ms for insert interval
\n
"
,
__func__
,
__LINE__
,
sleepTime
);
taosMsleep
(
sleepTime
);
// ms
sleepTimeTotal
+=
insert_interval
;
}
}
}
if
(
percentComplete
<
100
)
printf
(
"[%d]:%d%%
\n
"
,
pThreadInfo
->
threadID
,
percentComplete
);
free_of_interlace_stmt:
printStatPerThread
(
pThreadInfo
);
return
NULL
;
}
#else
// stmt sync write interlace data
static
void
*
syncWriteInterlaceStmt
(
threadInfo
*
pThreadInfo
,
uint32_t
interlaceRows
)
{
debugPrint
(
"[%d] %s() LN%d: ### stmt interlace write
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
);
int64_t
insertRows
;
uint64_t
maxSqlLen
;
int64_t
nTimeStampStep
;
uint64_t
insert_interval
;
SSuperTable
*
stbInfo
=
pThreadInfo
->
stbInfo
;
if
(
stbInfo
)
{
insertRows
=
stbInfo
->
insertRows
;
maxSqlLen
=
stbInfo
->
maxSqlLen
;
maxSqlLen
=
stbInfo
->
maxSqlLen
;
nTimeStampStep
=
stbInfo
->
timeStampStep
;
nTimeStampStep
=
stbInfo
->
timeStampStep
;
insert_interval
=
stbInfo
->
insertInterval
;
insert_interval
=
stbInfo
->
insertInterval
;
}
else
{
}
else
{
insertRows
=
g_args
.
insertRows
;
insertRows
=
g_args
.
insertRows
;
interlaceRows
=
g_args
.
interlace_rows
;
maxSqlLen
=
g_args
.
max_sql_len
;
maxSqlLen
=
g_args
.
max_sql_len
;
nTimeStampStep
=
g_args
.
timestamp_step
;
nTimeStampStep
=
g_args
.
timestamp_step
;
insert_interval
=
g_args
.
insert_interval
;
insert_interval
=
g_args
.
insert_interval
;
...
@@ -8500,9 +8715,232 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
...
@@ -8500,9 +8715,232 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
pThreadInfo
->
start_table_from
,
pThreadInfo
->
start_table_from
,
pThreadInfo
->
ntables
,
insertRows
);
pThreadInfo
->
ntables
,
insertRows
);
if
(
interlaceRows
>
insertRows
)
uint32_t
batchPerTbl
=
interlaceRows
;
interlaceRows
=
insertRows
;
uint32_t
batchPerTblTimes
;
if
(
interlaceRows
>
g_args
.
reqPerReq
)
interlaceRows
=
g_args
.
reqPerReq
;
if
((
interlaceRows
>
0
)
&&
(
pThreadInfo
->
ntables
>
1
))
{
batchPerTblTimes
=
g_args
.
reqPerReq
/
interlaceRows
;
}
else
{
batchPerTblTimes
=
1
;
}
pThreadInfo
->
totalInsertRows
=
0
;
pThreadInfo
->
totalAffectedRows
=
0
;
uint64_t
st
=
0
;
uint64_t
et
=
UINT64_MAX
;
uint64_t
lastPrintTime
=
taosGetTimestampMs
();
uint64_t
startTs
=
taosGetTimestampMs
();
uint64_t
endTs
;
uint64_t
tableSeq
=
pThreadInfo
->
start_table_from
;
int64_t
startTime
=
pThreadInfo
->
start_time
;
uint64_t
generatedRecPerTbl
=
0
;
bool
flagSleep
=
true
;
uint64_t
sleepTimeTotal
=
0
;
int
percentComplete
=
0
;
int64_t
totalRows
=
insertRows
*
pThreadInfo
->
ntables
;
while
(
pThreadInfo
->
totalInsertRows
<
pThreadInfo
->
ntables
*
insertRows
)
{
if
((
flagSleep
)
&&
(
insert_interval
))
{
st
=
taosGetTimestampMs
();
flagSleep
=
false
;
}
uint32_t
recOfBatch
=
0
;
int32_t
generated
;
for
(
uint64_t
i
=
0
;
i
<
batchPerTblTimes
;
i
++
)
{
char
tableName
[
TSDB_TABLE_NAME_LEN
];
getTableName
(
tableName
,
pThreadInfo
,
tableSeq
);
if
(
0
==
strlen
(
tableName
))
{
errorPrint2
(
"[%d] %s() LN%d, getTableName return null
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
);
return
NULL
;
}
if
(
stbInfo
)
{
generated
=
prepareStbStmtWithSample
(
pThreadInfo
,
tableName
,
tableSeq
,
batchPerTbl
,
insertRows
,
0
,
startTime
,
&
(
pThreadInfo
->
samplePos
));
}
else
{
debugPrint
(
"[%d] %s() LN%d, tableName:%s, batch:%d startTime:%"
PRId64
"
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
tableName
,
batchPerTbl
,
startTime
);
generated
=
prepareStmtWithoutStb
(
pThreadInfo
,
tableName
,
batchPerTbl
,
insertRows
,
i
,
startTime
);
}
debugPrint
(
"[%d] %s() LN%d, generated records is %d
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
generated
);
if
(
generated
<
0
)
{
errorPrint2
(
"[%d] %s() LN%d, generated records is %d
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
generated
);
goto
free_of_interlace_stmt
;
}
else
if
(
generated
==
0
)
{
break
;
}
tableSeq
++
;
recOfBatch
+=
batchPerTbl
;
pThreadInfo
->
totalInsertRows
+=
batchPerTbl
;
verbosePrint
(
"[%d] %s() LN%d batchPerTbl=%d recOfBatch=%d
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
batchPerTbl
,
recOfBatch
);
if
(
tableSeq
==
pThreadInfo
->
start_table_from
+
pThreadInfo
->
ntables
)
{
// turn to first table
tableSeq
=
pThreadInfo
->
start_table_from
;
generatedRecPerTbl
+=
batchPerTbl
;
startTime
=
pThreadInfo
->
start_time
+
generatedRecPerTbl
*
nTimeStampStep
;
flagSleep
=
true
;
if
(
generatedRecPerTbl
>=
insertRows
)
break
;
int64_t
remainRows
=
insertRows
-
generatedRecPerTbl
;
if
((
remainRows
>
0
)
&&
(
batchPerTbl
>
remainRows
))
batchPerTbl
=
remainRows
;
if
(
pThreadInfo
->
ntables
*
batchPerTbl
<
g_args
.
reqPerReq
)
break
;
}
verbosePrint
(
"[%d] %s() LN%d generatedRecPerTbl=%"
PRId64
" insertRows=%"
PRId64
"
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
generatedRecPerTbl
,
insertRows
);
if
((
g_args
.
reqPerReq
-
recOfBatch
)
<
batchPerTbl
)
break
;
}
verbosePrint
(
"[%d] %s() LN%d recOfBatch=%d totalInsertRows=%"
PRIu64
"
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
recOfBatch
,
pThreadInfo
->
totalInsertRows
);
startTs
=
taosGetTimestampUs
();
if
(
recOfBatch
==
0
)
{
errorPrint2
(
"[%d] %s() LN%d Failed to insert records of batch %d
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
batchPerTbl
);
if
(
batchPerTbl
>
0
)
{
errorPrint
(
"
\t
If the batch is %d, the length of the SQL to insert a row must be less then %"
PRId64
"
\n
"
,
batchPerTbl
,
maxSqlLen
/
batchPerTbl
);
}
goto
free_of_interlace_stmt
;
}
int64_t
affectedRows
=
execInsert
(
pThreadInfo
,
recOfBatch
);
endTs
=
taosGetTimestampUs
();
uint64_t
delay
=
endTs
-
startTs
;
performancePrint
(
"%s() LN%d, insert execution time is %10.2f ms
\n
"
,
__func__
,
__LINE__
,
delay
/
1000
.
0
);
verbosePrint
(
"[%d] %s() LN%d affectedRows=%"
PRId64
"
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
affectedRows
);
if
(
delay
>
pThreadInfo
->
maxDelay
)
pThreadInfo
->
maxDelay
=
delay
;
if
(
delay
<
pThreadInfo
->
minDelay
)
pThreadInfo
->
minDelay
=
delay
;
pThreadInfo
->
cntDelay
++
;
pThreadInfo
->
totalDelay
+=
delay
;
if
(
recOfBatch
!=
affectedRows
)
{
errorPrint2
(
"[%d] %s() LN%d execInsert insert %d, affected rows: %"
PRId64
"
\n\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
recOfBatch
,
affectedRows
);
goto
free_of_interlace_stmt
;
}
pThreadInfo
->
totalAffectedRows
+=
affectedRows
;
int
currentPercent
=
pThreadInfo
->
totalAffectedRows
*
100
/
totalRows
;
if
(
currentPercent
>
percentComplete
)
{
printf
(
"[%d]:%d%%
\n
"
,
pThreadInfo
->
threadID
,
currentPercent
);
percentComplete
=
currentPercent
;
}
int64_t
currentPrintTime
=
taosGetTimestampMs
();
if
(
currentPrintTime
-
lastPrintTime
>
30
*
1000
)
{
printf
(
"thread[%d] has currently inserted rows: %"
PRIu64
", affected rows: %"
PRIu64
"
\n
"
,
pThreadInfo
->
threadID
,
pThreadInfo
->
totalInsertRows
,
pThreadInfo
->
totalAffectedRows
);
lastPrintTime
=
currentPrintTime
;
}
if
((
insert_interval
)
&&
flagSleep
)
{
et
=
taosGetTimestampMs
();
if
(
insert_interval
>
(
et
-
st
)
)
{
uint64_t
sleepTime
=
insert_interval
-
(
et
-
st
);
performancePrint
(
"%s() LN%d sleep: %"
PRId64
" ms for insert interval
\n
"
,
__func__
,
__LINE__
,
sleepTime
);
taosMsleep
(
sleepTime
);
// ms
sleepTimeTotal
+=
insert_interval
;
}
}
}
if
(
percentComplete
<
100
)
printf
(
"[%d]:%d%%
\n
"
,
pThreadInfo
->
threadID
,
percentComplete
);
free_of_interlace_stmt:
printStatPerThread
(
pThreadInfo
);
return
NULL
;
}
#endif
// sync write interlace data
static
void
*
syncWriteInterlace
(
threadInfo
*
pThreadInfo
,
uint32_t
interlaceRows
)
{
debugPrint
(
"[%d] %s() LN%d: ### interlace write
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
);
int64_t
insertRows
;
uint64_t
maxSqlLen
;
int64_t
nTimeStampStep
;
uint64_t
insert_interval
;
SSuperTable
*
stbInfo
=
pThreadInfo
->
stbInfo
;
if
(
stbInfo
)
{
insertRows
=
stbInfo
->
insertRows
;
maxSqlLen
=
stbInfo
->
maxSqlLen
;
nTimeStampStep
=
stbInfo
->
timeStampStep
;
insert_interval
=
stbInfo
->
insertInterval
;
}
else
{
insertRows
=
g_args
.
insertRows
;
maxSqlLen
=
g_args
.
max_sql_len
;
nTimeStampStep
=
g_args
.
timestamp_step
;
insert_interval
=
g_args
.
insert_interval
;
}
debugPrint
(
"[%d] %s() LN%d: start_table_from=%"
PRIu64
" ntables=%"
PRId64
" insertRows=%"
PRIu64
"
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
pThreadInfo
->
start_table_from
,
pThreadInfo
->
ntables
,
insertRows
);
#if 1
if
(
interlaceRows
>
g_args
.
reqPerReq
)
if
(
interlaceRows
>
g_args
.
reqPerReq
)
interlaceRows
=
g_args
.
reqPerReq
;
interlaceRows
=
g_args
.
reqPerReq
;
...
@@ -8515,7 +8953,22 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
...
@@ -8515,7 +8953,22 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
}
else
{
}
else
{
batchPerTblTimes
=
1
;
batchPerTblTimes
=
1
;
}
}
#else
uint32_t
batchPerTbl
;
if
(
interlaceRows
>
g_args
.
reqPerReq
)
batchPerTbl
=
g_args
.
reqPerReq
;
else
batchPerTbl
=
interlaceRows
;
uint32_t
batchPerTblTimes
;
if
((
interlaceRows
>
0
)
&&
(
pThreadInfo
->
ntables
>
1
))
{
batchPerTblTimes
=
interlaceRows
/
batchPerTbl
;
}
else
{
batchPerTblTimes
=
1
;
}
#endif
pThreadInfo
->
buffer
=
calloc
(
maxSqlLen
,
1
);
pThreadInfo
->
buffer
=
calloc
(
maxSqlLen
,
1
);
if
(
NULL
==
pThreadInfo
->
buffer
)
{
if
(
NULL
==
pThreadInfo
->
buffer
)
{
errorPrint2
(
"%s() LN%d, Failed to alloc %"
PRIu64
" Bytes, reason:%s
\n
"
,
errorPrint2
(
"%s() LN%d, Failed to alloc %"
PRIu64
" Bytes, reason:%s
\n
"
,
...
@@ -8548,6 +9001,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
...
@@ -8548,6 +9001,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
st
=
taosGetTimestampMs
();
st
=
taosGetTimestampMs
();
flagSleep
=
false
;
flagSleep
=
false
;
}
}
// generate data
// generate data
memset
(
pThreadInfo
->
buffer
,
0
,
maxSqlLen
);
memset
(
pThreadInfo
->
buffer
,
0
,
maxSqlLen
);
uint64_t
remainderBufLen
=
maxSqlLen
;
uint64_t
remainderBufLen
=
maxSqlLen
;
...
@@ -8576,16 +9030,6 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
...
@@ -8576,16 +9030,6 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
uint64_t
oldRemainderLen
=
remainderBufLen
;
uint64_t
oldRemainderLen
=
remainderBufLen
;
if
(
stbInfo
)
{
if
(
stbInfo
)
{
if
(
stbInfo
->
iface
==
STMT_IFACE
)
{
generated
=
prepareStbStmtWithSample
(
pThreadInfo
,
tableName
,
tableSeq
,
batchPerTbl
,
insertRows
,
0
,
startTime
,
&
(
pThreadInfo
->
samplePos
));
}
else
{
generated
=
generateStbInterlaceData
(
generated
=
generateStbInterlaceData
(
pThreadInfo
,
pThreadInfo
,
tableName
,
batchPerTbl
,
i
,
tableName
,
batchPerTbl
,
i
,
...
@@ -8595,19 +9039,6 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
...
@@ -8595,19 +9039,6 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
insertRows
,
insertRows
,
startTime
,
startTime
,
&
remainderBufLen
);
&
remainderBufLen
);
}
}
else
{
if
(
g_args
.
iface
==
STMT_IFACE
)
{
debugPrint
(
"[%d] %s() LN%d, tableName:%s, batch:%d startTime:%"
PRId64
"
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
tableName
,
batchPerTbl
,
startTime
);
generated
=
prepareStmtWithoutStb
(
pThreadInfo
,
tableName
,
batchPerTbl
,
insertRows
,
i
,
startTime
);
}
else
{
}
else
{
generated
=
generateInterlaceDataWithoutStb
(
generated
=
generateInterlaceDataWithoutStb
(
tableName
,
batchPerTbl
,
tableName
,
batchPerTbl
,
...
@@ -8617,7 +9048,6 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
...
@@ -8617,7 +9048,6 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
startTime
,
startTime
,
&
remainderBufLen
);
&
remainderBufLen
);
}
}
}
debugPrint
(
"[%d] %s() LN%d, generated records is %d
\n
"
,
debugPrint
(
"[%d] %s() LN%d, generated records is %d
\n
"
,
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
generated
);
pThreadInfo
->
threadID
,
__func__
,
__LINE__
,
generated
);
...
@@ -8932,23 +9362,29 @@ static void* syncWrite(void *sarg) {
...
@@ -8932,23 +9362,29 @@ static void* syncWrite(void *sarg) {
setThreadName
(
"syncWrite"
);
setThreadName
(
"syncWrite"
);
uint32_t
interlaceRows
;
uint32_t
interlaceRows
=
0
;
if
(
stbInfo
)
{
if
(
stbInfo
)
{
if
((
stbInfo
->
interlaceRows
==
0
)
if
(
stbInfo
->
interlaceRows
<
stbInfo
->
insertRows
)
&&
(
g_args
.
interlace_rows
>
0
))
{
interlaceRows
=
g_args
.
interlace_rows
;
}
else
{
interlaceRows
=
stbInfo
->
interlaceRows
;
interlaceRows
=
stbInfo
->
interlaceRows
;
}
}
else
{
}
else
{
interlaceRows
=
g_args
.
interlace_rows
;
if
(
g_args
.
interlaceRows
<
g_args
.
insertRows
)
interlaceRows
=
g_args
.
interlaceRows
;
}
}
if
(
interlaceRows
>
0
)
{
if
(
interlaceRows
>
0
)
{
// interlace mode
// interlace mode
return
syncWriteInterlace
(
pThreadInfo
);
if
(((
stbInfo
)
&&
(
STMT_IFACE
==
stbInfo
->
iface
))
||
(
STMT_IFACE
==
g_args
.
iface
))
{
#if STMT_BIND_PARAM_BATCH == 1
return
syncWriteInterlaceStmtBatch
(
pThreadInfo
,
interlaceRows
);
#else
return
syncWriteInterlaceStmt
(
pThreadInfo
,
interlaceRows
);
#endif
}
else
{
}
else
{
return
syncWriteInterlace
(
pThreadInfo
,
interlaceRows
);
}
}
else
{
// progressive mode
// progressive mode
return
syncWriteProgressive
(
pThreadInfo
);
return
syncWriteProgressive
(
pThreadInfo
);
}
}
...
@@ -9231,22 +9667,25 @@ static void startMultiThreadInsertData(int threads, char* db_name,
...
@@ -9231,22 +9667,25 @@ static void startMultiThreadInsertData(int threads, char* db_name,
assert
(
stmtBuffer
);
assert
(
stmtBuffer
);
#if STMT_BIND_PARAM_BATCH == 1
#if STMT_BIND_PARAM_BATCH == 1
uint32_t
interlaceRows
;
uint32_t
interlaceRows
=
0
;
uint32_t
batch
;
uint32_t
batch
;
if
(
stbInfo
)
{
if
(
stbInfo
)
{
if
((
stbInfo
->
interlaceRows
==
0
)
if
((
stbInfo
->
interlaceRows
==
0
)
&&
(
g_args
.
interlace_rows
>
0
))
{
&&
(
g_args
.
interlaceRows
>
0
)
interlaceRows
=
g_args
.
interlace_rows
;
)
{
interlaceRows
=
g_args
.
interlaceRows
;
if
(
interlaceRows
>
stbInfo
->
insertRows
)
{
interlaceRows
=
stbInfo
->
insertRows
;
}
}
else
{
}
else
{
interlaceRows
=
stbInfo
->
interlaceRows
;
interlaceRows
=
stbInfo
->
interlaceRows
;
}
}
if
(
interlaceRows
>
stbInfo
->
insertRows
)
{
interlaceRows
=
0
;
}
}
else
{
}
else
{
interlaceRows
=
g_args
.
interlace_rows
;
if
(
g_args
.
interlaceRows
<
g_args
.
insertRows
)
interlaceRows
=
g_args
.
interlaceRows
;
}
}
if
(
interlaceRows
>
0
)
{
if
(
interlaceRows
>
0
)
{
...
@@ -9408,13 +9847,12 @@ static void startMultiThreadInsertData(int threads, char* db_name,
...
@@ -9408,13 +9847,12 @@ static void startMultiThreadInsertData(int threads, char* db_name,
taos_stmt_close
(
pThreadInfo
->
stmt
);
taos_stmt_close
(
pThreadInfo
->
stmt
);
}
}
#if STMT_BIND_PARAM_BATCH == 1
tmfree
((
char
*
)
pThreadInfo
->
bind_ts
);
tmfree
((
char
*
)
pThreadInfo
->
bind_ts
);
#if STMT_BIND_PARAM_BATCH == 1
tmfree
((
char
*
)
pThreadInfo
->
bind_ts_array
);
tmfree
((
char
*
)
pThreadInfo
->
bind_ts_array
);
tmfree
(
pThreadInfo
->
bindParams
);
tmfree
(
pThreadInfo
->
bindParams
);
tmfree
(
pThreadInfo
->
is_null
);
tmfree
(
pThreadInfo
->
is_null
);
#else
#else
tmfree
((
char
*
)
pThreadInfo
->
bind_ts
);
if
(
pThreadInfo
->
sampleBindArray
)
{
if
(
pThreadInfo
->
sampleBindArray
)
{
for
(
int
k
=
0
;
k
<
MAX_SAMPLES
;
k
++
)
{
for
(
int
k
=
0
;
k
<
MAX_SAMPLES
;
k
++
)
{
uintptr_t
*
tmp
=
(
uintptr_t
*
)(
*
(
uintptr_t
*
)(
uintptr_t
*
tmp
=
(
uintptr_t
*
)(
*
(
uintptr_t
*
)(
...
...
tests/pytest/tools/taosdemoAllTest/insertInterlaceRowsLarge1M.json
浏览文件 @
f033362c
...
@@ -41,7 +41,7 @@
...
@@ -41,7 +41,7 @@
"batch_create_tbl_num"
:
10
,
"batch_create_tbl_num"
:
10
,
"data_source"
:
"rand"
,
"data_source"
:
"rand"
,
"insert_mode"
:
"taosc"
,
"insert_mode"
:
"taosc"
,
"insert_rows"
:
100
0
,
"insert_rows"
:
100
1
,
"childtable_limit"
:
0
,
"childtable_limit"
:
0
,
"childtable_offset"
:
0
,
"childtable_offset"
:
0
,
"multi_thread_write_one_tbl"
:
"no"
,
"multi_thread_write_one_tbl"
:
"no"
,
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录