Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
e26710bc
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
e26710bc
编写于
1月 19, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of
https://github.com/taosdata/TDengine
into feature/tkv
上级
73f802c3
b3526ee2
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
521 addition
and
109 deletion
+521
-109
include/libs/scheduler/scheduler.h
include/libs/scheduler/scheduler.h
+8
-0
include/util/taoserror.h
include/util/taoserror.h
+1
-0
source/dnode/mgmt/impl/src/dndVnodes.c
source/dnode/mgmt/impl/src/dndVnodes.c
+1
-0
source/dnode/mnode/impl/src/mndAuth.c
source/dnode/mnode/impl/src/mndAuth.c
+12
-3
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+140
-33
source/libs/scheduler/test/schedulerTests.cpp
source/libs/scheduler/test/schedulerTests.cpp
+253
-11
source/util/src/terror.c
source/util/src/terror.c
+1
-0
tests/script/sh/massiveTable/deployCluster.sh
tests/script/sh/massiveTable/deployCluster.sh
+56
-5
tests/test/c/create_table.c
tests/test/c/create_table.c
+49
-57
未找到文件。
include/libs/scheduler/scheduler.h
浏览文件 @
e26710bc
...
...
@@ -114,6 +114,14 @@ void schedulerDestroy(void);
*/
int32_t
schedulerConvertDagToTaskList
(
SQueryDag
*
pDag
,
SArray
**
pTasks
);
/**
* make one task info's multiple copies
* @param src
* @param dst SArray**<STaskInfo>
* @return
*/
int32_t
schedulerCopyTask
(
STaskInfo
*
src
,
SArray
**
dst
,
int32_t
copyNum
);
void
schedulerFreeTaskList
(
SArray
*
taskList
);
...
...
include/util/taoserror.h
浏览文件 @
e26710bc
...
...
@@ -361,6 +361,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_QRY_TASK_DROPPING TAOS_DEF_ERROR_CODE(0, 0x0717) //"Task dropping")
#define TSDB_CODE_QRY_DUPLICATTED_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0718) //"Duplicatted operation")
#define TSDB_CODE_QRY_TASK_MSG_ERROR TAOS_DEF_ERROR_CODE(0, 0x0719) //"Task message error")
#define TSDB_CODE_QRY_JOB_FREED TAOS_DEF_ERROR_CODE(0, 0x071A) //"Job freed")
// grant
#define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800) //"License expired")
...
...
source/dnode/mgmt/impl/src/dndVnodes.c
浏览文件 @
e26710bc
...
...
@@ -841,6 +841,7 @@ static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) {
SVnodeObj
*
pVnode
=
dndAcquireVnode
(
pDnode
,
pHead
->
vgId
);
if
(
pVnode
==
NULL
)
{
dError
(
"vgId:%d, failed to acquire vnode while process req"
,
pHead
->
vgId
);
if
(
pMsg
->
msgType
&
1u
)
{
SRpcMsg
rsp
=
{.
handle
=
pMsg
->
handle
,
.
code
=
TSDB_CODE_VND_INVALID_VGROUP_ID
};
rpcSendResponse
(
&
rsp
);
...
...
source/dnode/mnode/impl/src/mndAuth.c
浏览文件 @
e26710bc
...
...
@@ -16,9 +16,18 @@
#define _DEFAULT_SOURCE
#include "mndAuth.h"
int32_t
mndInitAuth
(
SMnode
*
pMnode
)
{
return
0
;
}
void
mndCleanupAuth
(
SMnode
*
pMnode
)
{}
static
int32_t
mndProcessAuthReq
(
SMnodeMsg
*
pReq
);
int32_t
mndRetriveAuth
(
SMnode
*
pMnode
,
char
*
user
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
)
{
int32_t
mndInitAuth
(
SMnode
*
pMnode
)
{
mndSetMsgHandle
(
pMnode
,
TDMT_MND_AUTH
,
mndProcessAuthReq
);
return
0
;
}
void
mndCleanupAuth
(
SMnode
*
pMnode
)
{}
int32_t
mndRetriveAuth
(
SMnode
*
pMnode
,
char
*
user
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
)
{
return
0
;
}
static
int32_t
mndProcessAuthReq
(
SMnodeMsg
*
pReq
)
{
mDebug
(
"user:%s, auth req is processed"
,
pReq
->
user
);
return
0
;
}
\ No newline at end of file
source/libs/scheduler/src/scheduler.c
浏览文件 @
e26710bc
...
...
@@ -20,12 +20,38 @@
static
SSchedulerMgmt
schMgmt
=
{
0
};
uint64_t
schGenTaskId
(
void
)
{
return
atomic_add_fetch_64
(
&
schMgmt
.
taskId
,
1
);
}
uint64_t
schGenUUID
(
void
)
{
static
uint64_t
hashId
=
0
;
static
int32_t
requestSerialId
=
0
;
if
(
hashId
==
0
)
{
char
uid
[
64
];
int32_t
code
=
taosGetSystemUUID
(
uid
,
tListLen
(
uid
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"Failed to get the system uid, reason:%s"
,
tstrerror
(
TAOS_SYSTEM_ERROR
(
errno
)));
}
else
{
hashId
=
MurmurHash3_32
(
uid
,
strlen
(
uid
));
}
}
int64_t
ts
=
taosGetTimestampMs
();
uint64_t
pid
=
taosGetPId
();
int32_t
val
=
atomic_add_fetch_32
(
&
requestSerialId
,
1
);
uint64_t
id
=
((
hashId
&
0x0FFF
)
<<
52
)
|
((
pid
&
0x0FFF
)
<<
40
)
|
((
ts
&
0xFFFFFF
)
<<
16
)
|
(
val
&
0xFFFF
);
return
id
;
}
int32_t
schInitTask
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SSubplan
*
pPlan
,
SSchLevel
*
pLevel
)
{
pTask
->
plan
=
pPlan
;
pTask
->
level
=
pLevel
;
SCH_SET_TASK_STATUS
(
pTask
,
JOB_TASK_STATUS_NOT_START
);
pTask
->
taskId
=
atomic_add_fetch_64
(
&
schMgmt
.
taskId
,
1
);
pTask
->
taskId
=
schGenTaskId
(
);
pTask
->
execAddrs
=
taosArrayInit
(
SCH_MAX_CONDIDATE_EP_NUM
,
sizeof
(
SQueryNodeAddr
));
if
(
NULL
==
pTask
->
execAddrs
)
{
SCH_TASK_ELOG
(
"taosArrayInit %d exec addrs failed"
,
SCH_MAX_CONDIDATE_EP_NUM
);
...
...
@@ -40,8 +66,7 @@ void schFreeTask(SSchTask* pTask) {
taosArrayDestroy
(
pTask
->
candidateAddrs
);
}
// TODO NEED TO VERFY WITH ASYNC_SEND MEMORY FREE
//tfree(pTask->msg);
tfree
(
pTask
->
msg
);
if
(
pTask
->
children
)
{
taosArrayDestroy
(
pTask
->
children
);
...
...
@@ -71,7 +96,7 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m
SCH_TASK_ELOG
(
"rsp msg type mis-match, last sent msgType:%d, rspType:%d"
,
lastMsgType
,
msgType
);
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
if
(
SCH_GET_TASK_STATUS
(
pTask
)
!=
JOB_TASK_STATUS_EXECUTING
&&
SCH_GET_TASK_STATUS
(
pTask
)
!=
JOB_TASK_STATUS_PARTIAL_SUCCEED
)
{
SCH_TASK_ELOG
(
"rsp msg conflicted with task status, status:%d, rspType:%d"
,
SCH_GET_TASK_STATUS
(
pTask
),
msgType
);
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
...
...
@@ -141,7 +166,7 @@ int32_t schCheckAndUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
break
;
case
JOB_TASK_STATUS_CANCELLED
:
case
JOB_TASK_STATUS_DROPPING
:
SCH_ERR_JRET
(
TSDB_CODE_QRY_
APP_ERROR
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_
JOB_FREED
);
break
;
default:
...
...
@@ -541,12 +566,9 @@ int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, b
return
TSDB_CODE_SUCCESS
;
}
// Note: no more error processing, handled in function internal
int32_t
schProcessOnJobFailure
(
SSchJob
*
pJob
,
int32_t
errCode
)
{
int32_t
schProcessOnJobFailureImpl
(
SSchJob
*
pJob
,
int32_t
status
,
int32_t
errCode
)
{
// if already FAILED, no more processing
SCH_ERR_RET
(
schCheckAndUpdateJobStatus
(
pJob
,
JOB_TASK_STATUS_FAILED
));
SCH_ERR_RET
(
schCheckAndUpdateJobStatus
(
pJob
,
status
));
if
(
errCode
)
{
atomic_store_32
(
&
pJob
->
errCode
,
errCode
);
...
...
@@ -563,6 +585,17 @@ int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) {
// Note: no more error processing, handled in function internal
int32_t
schProcessOnJobFailure
(
SSchJob
*
pJob
,
int32_t
errCode
)
{
SCH_RET
(
schProcessOnJobFailureImpl
(
pJob
,
JOB_TASK_STATUS_FAILED
,
errCode
));
}
// Note: no more error processing, handled in function internal
int32_t
schProcessOnJobDropped
(
SSchJob
*
pJob
,
int32_t
errCode
)
{
SCH_RET
(
schProcessOnJobFailureImpl
(
pJob
,
JOB_TASK_STATUS_DROPPING
,
errCode
));
}
// Note: no more error processing, handled in function internal
int32_t
schFetchFromRemote
(
SSchJob
*
pJob
)
{
int32_t
code
=
0
;
...
...
@@ -834,8 +867,18 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
SCH_ERR_RET
(
schProcessOnTaskFailure
(
pJob
,
pTask
,
rspCode
));
}
if
(
pJob
->
res
)
{
SCH_TASK_ELOG
(
"got fetch rsp while res already exists, res:%p"
,
pJob
->
res
);
tfree
(
rsp
);
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
atomic_store_ptr
(
&
pJob
->
res
,
rsp
);
atomic_store_32
(
&
pJob
->
resNumOfRows
,
rsp
->
numOfRows
);
if
(
rsp
->
completed
)
{
SCH_SET_TASK_STATUS
(
pTask
,
JOB_TASK_STATUS_SUCCEED
);
}
SCH_ERR_JRET
(
schProcessOnDataFetched
(
pJob
));
...
...
@@ -871,7 +914,7 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in
SSchJob
**
job
=
taosHashGet
(
schMgmt
.
jobs
,
&
pParam
->
queryId
,
sizeof
(
pParam
->
queryId
));
if
(
NULL
==
job
||
NULL
==
(
*
job
))
{
qError
(
"QID:%"
PRIx64
" taosHashGet queryId not exist, may be dropped"
,
pParam
->
queryId
);
SCH_ERR_JRET
(
TSDB_CODE_
SCH_INTERNAL_ERROR
);
SCH_ERR_JRET
(
TSDB_CODE_
QRY_JOB_FREED
);
}
pJob
=
*
job
;
...
...
@@ -880,13 +923,13 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in
int32_t
s
=
taosHashGetSize
(
pJob
->
execTasks
);
if
(
s
<=
0
)
{
qError
(
"QID:%"
PRIx64
",TID:%"
PRI
x
64
" no task in execTask list"
,
pParam
->
queryId
,
pParam
->
taskId
);
qError
(
"QID:%"
PRIx64
",TID:%"
PRI
d
64
" no task in execTask list"
,
pParam
->
queryId
,
pParam
->
taskId
);
SCH_ERR_JRET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
SSchTask
**
task
=
taosHashGet
(
pJob
->
execTasks
,
&
pParam
->
taskId
,
sizeof
(
pParam
->
taskId
));
if
(
NULL
==
task
||
NULL
==
(
*
task
))
{
qError
(
"QID:%"
PRIx64
",TID:%"
PRI
x
64
" taosHashGet taskId not exist"
,
pParam
->
queryId
,
pParam
->
taskId
);
qError
(
"QID:%"
PRIx64
",TID:%"
PRI
d
64
" taosHashGet taskId not exist"
,
pParam
->
queryId
,
pParam
->
taskId
);
SCH_ERR_JRET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
...
...
@@ -1033,7 +1076,13 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
case
TDMT_VND_CREATE_TABLE
:
case
TDMT_VND_SUBMIT
:
{
msgSize
=
pTask
->
msgLen
;
msg
=
pTask
->
msg
;
msg
=
calloc
(
1
,
msgSize
);
if
(
NULL
==
msg
)
{
SCH_TASK_ELOG
(
"calloc %d failed"
,
msgSize
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
memcpy
(
msg
,
pTask
->
msg
,
msgSize
);
break
;
}
...
...
@@ -1434,7 +1483,7 @@ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) {
pMsg
->
sId
=
htobe64
(
schMgmt
.
sId
);
pMsg
->
queryId
=
htobe64
(
plan
->
id
.
queryId
);
pMsg
->
taskId
=
htobe64
(
atomic_add_fetch_64
(
&
schMgmt
.
taskId
,
1
));
pMsg
->
taskId
=
htobe64
(
schGenUUID
(
));
pMsg
->
contentLen
=
htonl
(
msgLen
);
memcpy
(
pMsg
->
msg
,
msg
,
msgLen
);
...
...
@@ -1457,6 +1506,52 @@ _return:
SCH_RET
(
code
);
}
int32_t
schedulerCopyTask
(
STaskInfo
*
src
,
SArray
**
dst
,
int32_t
copyNum
)
{
if
(
NULL
==
src
||
NULL
==
dst
||
copyNum
<=
0
)
{
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
int32_t
code
=
0
;
*
dst
=
taosArrayInit
(
copyNum
,
sizeof
(
STaskInfo
));
if
(
NULL
==
*
dst
)
{
qError
(
"taosArrayInit %d taskInfo failed"
,
copyNum
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
int32_t
msgSize
=
src
->
msg
->
contentLen
+
sizeof
(
*
src
->
msg
);
STaskInfo
info
=
{
0
};
info
.
addr
=
src
->
addr
;
for
(
int32_t
i
=
0
;
i
<
copyNum
;
++
i
)
{
info
.
msg
=
malloc
(
msgSize
);
if
(
NULL
==
info
.
msg
)
{
qError
(
"malloc %d failed"
,
msgSize
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
memcpy
(
info
.
msg
,
src
->
msg
,
msgSize
);
info
.
msg
->
taskId
=
schGenUUID
();
if
(
NULL
==
taosArrayPush
(
*
dst
,
&
info
))
{
qError
(
"taosArrayPush failed, idx:%d"
,
i
);
free
(
info
.
msg
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
return
TSDB_CODE_SUCCESS
;
_return:
schedulerFreeTaskList
(
*
dst
);
*
dst
=
NULL
;
SCH_RET
(
code
);
}
int32_t
scheduleFetchRows
(
SSchJob
*
pJob
,
void
**
pData
)
{
if
(
NULL
==
pJob
||
NULL
==
pData
)
{
...
...
@@ -1464,33 +1559,29 @@ int32_t scheduleFetchRows(SSchJob *pJob, void** pData) {
}
int32_t
code
=
0
;
atomic_add_fetch_32
(
&
pJob
->
ref
,
1
);
int8_t
status
=
SCH_GET_JOB_STATUS
(
pJob
);
if
(
status
==
JOB_TASK_STATUS_DROPPING
)
{
SCH_JOB_ELOG
(
"job is dropping, status:%d"
,
status
);
return
TSDB_CODE_SCH_STATUS_ERROR
;
SCH_ERR_JRET
(
TSDB_CODE_SCH_STATUS_ERROR
)
;
}
atomic_add_fetch_32
(
&
pJob
->
ref
,
1
);
if
(
!
SCH_JOB_NEED_FETCH
(
&
pJob
->
attr
))
{
SCH_JOB_ELOG
(
"no need to fetch data, status:%d"
,
SCH_GET_JOB_STATUS
(
pJob
));
atomic_sub_fetch_32
(
&
pJob
->
ref
,
1
);
SCH_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
if
(
atomic_val_compare_exchange_8
(
&
pJob
->
userFetch
,
0
,
1
)
!=
0
)
{
SCH_JOB_ELOG
(
"prior fetching not finished, userFetch:%d"
,
atomic_load_8
(
&
pJob
->
userFetch
));
atomic_sub_fetch_32
(
&
pJob
->
ref
,
1
);
SCH_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
if
(
status
==
JOB_TASK_STATUS_FAILED
)
{
*
pData
=
atomic_load_ptr
(
&
pJob
->
res
);
atomic_store_ptr
(
&
pJob
->
res
,
NULL
);
if
(
JOB_TASK_STATUS_FAILED
==
status
||
JOB_TASK_STATUS_DROPPING
==
status
)
{
SCH_JOB_ELOG
(
"job failed or dropping, status:%d"
,
status
);
SCH_ERR_JRET
(
atomic_load_32
(
&
pJob
->
errCode
));
}
else
if
(
status
==
JOB_TASK_STATUS_SUCCEED
)
{
*
pData
=
atomic_load_ptr
(
&
pJob
->
res
);
atomic_store_ptr
(
&
pJob
->
res
,
NULL
);
SCH_JOB_ELOG
(
"job already succeed, status:%d"
,
status
);
goto
_return
;
}
else
if
(
status
==
JOB_TASK_STATUS_PARTIAL_SUCCEED
)
{
SCH_ERR_JRET
(
schFetchFromRemote
(
pJob
));
...
...
@@ -1500,15 +1591,17 @@ int32_t scheduleFetchRows(SSchJob *pJob, void** pData) {
status
=
SCH_GET_JOB_STATUS
(
pJob
);
if
(
status
==
JOB_TASK_STATUS_FAILED
)
{
code
=
atomic_load_32
(
&
pJob
->
errCode
);
SCH_ERR_JRET
(
code
);
if
(
JOB_TASK_STATUS_FAILED
==
status
||
JOB_TASK_STATUS_DROPPING
==
status
)
{
SCH_JOB_ELOG
(
"job failed or dropping, status:%d"
,
status
);
SCH_ERR_JRET
(
atomic_load_32
(
&
pJob
->
errCode
)
);
}
if
(
pJob
->
res
&&
((
SRetrieveTableRsp
*
)
pJob
->
res
)
->
completed
)
{
SCH_ERR_JRET
(
schCheckAndUpdateJobStatus
(
pJob
,
JOB_TASK_STATUS_SUCCEED
));
}
_return:
while
(
true
)
{
*
pData
=
atomic_load_ptr
(
&
pJob
->
res
);
...
...
@@ -1519,10 +1612,19 @@ int32_t scheduleFetchRows(SSchJob *pJob, void** pData) {
break
;
}
_return:
if
(
NULL
==
*
pData
)
{
SRetrieveTableRsp
*
rsp
=
(
SRetrieveTableRsp
*
)
calloc
(
1
,
sizeof
(
SRetrieveTableRsp
));
if
(
rsp
)
{
rsp
->
completed
=
1
;
}
*
pData
=
rsp
;
}
atomic_val_compare_exchange_8
(
&
pJob
->
userFetch
,
1
,
0
);
SCH_JOB_DLOG
(
"fetch done, code:%x"
,
code
);
atomic_sub_fetch_32
(
&
pJob
->
ref
,
1
);
SCH_RET
(
code
);
...
...
@@ -1547,6 +1649,7 @@ void scheduleFreeJob(void *job) {
SSchJob
*
pJob
=
job
;
uint64_t
queryId
=
pJob
->
queryId
;
bool
setJobFree
=
false
;
if
(
SCH_GET_JOB_STATUS
(
pJob
)
>
0
)
{
if
(
0
!=
taosHashRemove
(
schMgmt
.
jobs
,
&
pJob
->
queryId
,
sizeof
(
pJob
->
queryId
)))
{
...
...
@@ -1554,8 +1657,6 @@ void scheduleFreeJob(void *job) {
return
;
}
schCheckAndUpdateJobStatus
(
pJob
,
JOB_TASK_STATUS_DROPPING
);
SCH_JOB_DLOG
(
"job removed from list, no further ref, ref:%d"
,
atomic_load_32
(
&
pJob
->
ref
));
while
(
true
)
{
...
...
@@ -1563,6 +1664,11 @@ void scheduleFreeJob(void *job) {
if
(
0
==
ref
)
{
break
;
}
else
if
(
ref
>
0
)
{
if
(
1
==
ref
&&
atomic_load_8
(
&
pJob
->
userFetch
)
>
0
&&
!
setJobFree
)
{
schProcessOnJobDropped
(
pJob
,
TSDB_CODE_QRY_JOB_FREED
);
setJobFree
=
true
;
}
usleep
(
1
);
}
else
{
assert
(
0
);
...
...
@@ -1598,6 +1704,7 @@ void scheduleFreeJob(void *job) {
taosHashCleanup
(
pJob
->
succTasks
);
taosArrayDestroy
(
pJob
->
levels
);
taosArrayDestroy
(
pJob
->
nodeList
);
tfree
(
pJob
->
res
);
...
...
source/libs/scheduler/test/schedulerTests.cpp
浏览文件 @
e26710bc
...
...
@@ -38,6 +38,20 @@
namespace
{
extern
"C"
int32_t
schHandleResponseMsg
(
SSchJob
*
job
,
SSchTask
*
task
,
int32_t
msgType
,
char
*
msg
,
int32_t
msgSize
,
int32_t
rspCode
);
extern
"C"
int32_t
schHandleCallback
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
msgType
,
int32_t
rspCode
);
struct
SSchJob
*
pInsertJob
=
NULL
;
struct
SSchJob
*
pQueryJob
=
NULL
;
uint64_t
schtMergeTemplateId
=
0x4
;
uint64_t
schtFetchTaskId
=
0
;
uint64_t
schtQueryId
=
1
;
bool
schtTestStop
=
false
;
bool
schtTestDeadLoop
=
false
;
int32_t
schtTestMTRunSec
=
10
;
int32_t
schtTestPrintNum
=
1000
;
int32_t
schtStartFetch
=
0
;
void
schtInitLogFile
()
{
...
...
@@ -57,7 +71,7 @@ void schtInitLogFile() {
void
schtBuildQueryDag
(
SQueryDag
*
dag
)
{
uint64_t
qId
=
0x0000000000000001
;
uint64_t
qId
=
schtQueryId
;
dag
->
queryId
=
qId
;
dag
->
numOfSubplans
=
2
;
...
...
@@ -84,7 +98,7 @@ void schtBuildQueryDag(SQueryDag *dag) {
scanPlan
->
msgType
=
TDMT_VND_QUERY
;
mergePlan
->
id
.
queryId
=
qId
;
mergePlan
->
id
.
templateId
=
0x4444444444
;
mergePlan
->
id
.
templateId
=
schtMergeTemplateId
;
mergePlan
->
id
.
subplanId
=
0x5555555555
;
mergePlan
->
type
=
QUERY_TYPE_MERGE
;
mergePlan
->
level
=
0
;
...
...
@@ -173,8 +187,6 @@ void schtRpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int6
}
void
schtSetPlanToString
()
{
static
Stub
stub
;
stub
.
set
(
qSubPlanToString
,
schtPlanToString
);
...
...
@@ -214,7 +226,12 @@ void schtSetRpcSendRequest() {
}
}
int32_t
schtAsyncSendMsgToServer
(
void
*
pTransporter
,
SEpSet
*
epSet
,
int64_t
*
pTransporterId
,
const
SMsgSendInfo
*
pInfo
)
{
int32_t
schtAsyncSendMsgToServer
(
void
*
pTransporter
,
SEpSet
*
epSet
,
int64_t
*
pTransporterId
,
SMsgSendInfo
*
pInfo
)
{
if
(
pInfo
)
{
tfree
(
pInfo
->
param
);
tfree
(
pInfo
->
msgInfo
.
pData
);
free
(
pInfo
);
}
return
0
;
}
...
...
@@ -269,15 +286,224 @@ void *schtCreateFetchRspThread(void *param) {
SRetrieveTableRsp
*
rsp
=
(
SRetrieveTableRsp
*
)
calloc
(
1
,
sizeof
(
SRetrieveTableRsp
));
rsp
->
completed
=
1
;
rsp
->
numOfRows
=
10
;
code
=
schHandleResponseMsg
(
job
,
job
->
fetchTask
,
TDMT_VND_FETCH_RSP
,
(
char
*
)
rsp
,
sizeof
(
rsp
),
0
);
code
=
schHandleResponseMsg
(
job
,
job
->
fetchTask
,
TDMT_VND_FETCH_RSP
,
(
char
*
)
rsp
,
sizeof
(
*
rsp
),
0
);
assert
(
code
==
0
);
}
void
*
schtFetchRspThread
(
void
*
aa
)
{
SDataBuf
dataBuf
=
{
0
};
SSchCallbackParam
*
param
=
NULL
;
while
(
!
schtTestStop
)
{
if
(
0
==
atomic_val_compare_exchange_32
(
&
schtStartFetch
,
1
,
0
))
{
continue
;
}
usleep
(
1
);
param
=
(
SSchCallbackParam
*
)
calloc
(
1
,
sizeof
(
*
param
));
param
->
queryId
=
schtQueryId
;
param
->
taskId
=
schtFetchTaskId
;
int32_t
code
=
0
;
SRetrieveTableRsp
*
rsp
=
(
SRetrieveTableRsp
*
)
calloc
(
1
,
sizeof
(
SRetrieveTableRsp
));
rsp
->
completed
=
1
;
rsp
->
numOfRows
=
10
;
dataBuf
.
pData
=
rsp
;
dataBuf
.
len
=
sizeof
(
*
rsp
);
code
=
schHandleCallback
(
param
,
&
dataBuf
,
TDMT_VND_FETCH_RSP
,
0
);
assert
(
code
==
0
||
code
);
}
}
void
schtFreeQueryJob
(
int32_t
freeThread
)
{
static
uint32_t
freeNum
=
0
;
SSchJob
*
job
=
atomic_load_ptr
(
&
pQueryJob
);
if
(
job
&&
atomic_val_compare_exchange_ptr
(
&
pQueryJob
,
job
,
NULL
))
{
scheduleFreeJob
(
job
);
if
(
freeThread
)
{
if
(
++
freeNum
%
schtTestPrintNum
==
0
)
{
printf
(
"FreeNum:%d
\n
"
,
freeNum
);
}
}
}
}
void
*
schtRunJobThread
(
void
*
aa
)
{
void
*
mockPointer
=
(
void
*
)
0x1
;
char
*
clusterId
=
"cluster1"
;
char
*
dbname
=
"1.db1"
;
char
*
tablename
=
"table1"
;
SVgroupInfo
vgInfo
=
{
0
};
SQueryDag
dag
=
{
0
};
schtInitLogFile
();
int32_t
code
=
schedulerInit
(
NULL
);
assert
(
code
==
0
);
schtSetPlanToString
();
schtSetExecNode
();
schtSetAsyncSendMsgToServer
();
SSchJob
*
job
=
NULL
;
SSchCallbackParam
*
param
=
NULL
;
SHashObj
*
execTasks
=
NULL
;
SDataBuf
dataBuf
=
{
0
};
uint32_t
jobFinished
=
0
;
while
(
!
schtTestStop
)
{
schtBuildQueryDag
(
&
dag
);
SArray
*
qnodeList
=
taosArrayInit
(
1
,
sizeof
(
SEpAddr
));
SEpAddr
qnodeAddr
=
{
0
};
strcpy
(
qnodeAddr
.
fqdn
,
"qnode0.ep"
);
qnodeAddr
.
port
=
6031
;
taosArrayPush
(
qnodeList
,
&
qnodeAddr
);
code
=
scheduleAsyncExecJob
(
mockPointer
,
qnodeList
,
&
dag
,
&
job
);
assert
(
code
==
0
);
execTasks
=
taosHashInit
(
5
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_UBIGINT
),
false
,
HASH_ENTRY_LOCK
);
void
*
pIter
=
taosHashIterate
(
job
->
execTasks
,
NULL
);
while
(
pIter
)
{
SSchTask
*
task
=
*
(
SSchTask
**
)
pIter
;
schtFetchTaskId
=
task
->
taskId
-
1
;
taosHashPut
(
execTasks
,
&
task
->
taskId
,
sizeof
(
task
->
taskId
),
task
,
sizeof
(
*
task
));
pIter
=
taosHashIterate
(
job
->
execTasks
,
pIter
);
}
param
=
(
SSchCallbackParam
*
)
calloc
(
1
,
sizeof
(
*
param
));
param
->
queryId
=
schtQueryId
;
pQueryJob
=
job
;
pIter
=
taosHashIterate
(
execTasks
,
NULL
);
while
(
pIter
)
{
SSchTask
*
task
=
(
SSchTask
*
)
pIter
;
param
->
taskId
=
task
->
taskId
;
SQueryTableRsp
rsp
=
{
0
};
dataBuf
.
pData
=
&
rsp
;
dataBuf
.
len
=
sizeof
(
rsp
);
code
=
schHandleCallback
(
param
,
&
dataBuf
,
TDMT_VND_QUERY_RSP
,
0
);
assert
(
code
==
0
||
code
);
pIter
=
taosHashIterate
(
execTasks
,
pIter
);
}
param
=
(
SSchCallbackParam
*
)
calloc
(
1
,
sizeof
(
*
param
));
param
->
queryId
=
schtQueryId
;
pIter
=
taosHashIterate
(
execTasks
,
NULL
);
while
(
pIter
)
{
SSchTask
*
task
=
(
SSchTask
*
)
pIter
;
param
->
taskId
=
task
->
taskId
;
SResReadyRsp
rsp
=
{
0
};
dataBuf
.
pData
=
&
rsp
;
dataBuf
.
len
=
sizeof
(
rsp
);
code
=
schHandleCallback
(
param
,
&
dataBuf
,
TDMT_VND_RES_READY_RSP
,
0
);
assert
(
code
==
0
||
code
);
pIter
=
taosHashIterate
(
execTasks
,
pIter
);
}
param
=
(
SSchCallbackParam
*
)
calloc
(
1
,
sizeof
(
*
param
));
param
->
queryId
=
schtQueryId
;
pIter
=
taosHashIterate
(
execTasks
,
NULL
);
while
(
pIter
)
{
SSchTask
*
task
=
(
SSchTask
*
)
pIter
;
param
->
taskId
=
task
->
taskId
-
1
;
SQueryTableRsp
rsp
=
{
0
};
dataBuf
.
pData
=
&
rsp
;
dataBuf
.
len
=
sizeof
(
rsp
);
code
=
schHandleCallback
(
param
,
&
dataBuf
,
TDMT_VND_QUERY_RSP
,
0
);
assert
(
code
==
0
||
code
);
pIter
=
taosHashIterate
(
execTasks
,
pIter
);
}
param
=
(
SSchCallbackParam
*
)
calloc
(
1
,
sizeof
(
*
param
));
param
->
queryId
=
schtQueryId
;
pIter
=
taosHashIterate
(
execTasks
,
NULL
);
while
(
pIter
)
{
SSchTask
*
task
=
(
SSchTask
*
)
pIter
;
param
->
taskId
=
task
->
taskId
-
1
;
SResReadyRsp
rsp
=
{
0
};
dataBuf
.
pData
=
&
rsp
;
dataBuf
.
len
=
sizeof
(
rsp
);
code
=
schHandleCallback
(
param
,
&
dataBuf
,
TDMT_VND_RES_READY_RSP
,
0
);
assert
(
code
==
0
||
code
);
pIter
=
taosHashIterate
(
execTasks
,
pIter
);
}
atomic_store_32
(
&
schtStartFetch
,
1
);
void
*
data
=
NULL
;
code
=
scheduleFetchRows
(
pQueryJob
,
&
data
);
assert
(
code
==
0
||
code
);
if
(
0
==
code
)
{
SRetrieveTableRsp
*
pRsp
=
(
SRetrieveTableRsp
*
)
data
;
assert
(
pRsp
->
completed
==
1
);
assert
(
pRsp
->
numOfRows
==
10
);
}
data
=
NULL
;
code
=
scheduleFetchRows
(
pQueryJob
,
&
data
);
assert
(
code
==
0
||
code
);
schtFreeQueryJob
(
0
);
taosHashCleanup
(
execTasks
);
schtFreeQueryDag
(
&
dag
);
if
(
++
jobFinished
%
schtTestPrintNum
==
0
)
{
printf
(
"jobFinished:%d
\n
"
,
jobFinished
);
}
++
schtQueryId
;
}
schedulerDestroy
();
}
void
*
schtFreeJobThread
(
void
*
aa
)
{
while
(
!
schtTestStop
)
{
usleep
(
rand
()
%
100
);
schtFreeQueryJob
(
1
);
}
}
struct
SSchJob
*
pInsertJob
=
NULL
;
}
TEST
(
queryTest
,
normalCase
)
{
...
...
@@ -368,11 +594,12 @@ TEST(queryTest, normalCase) {
SRetrieveTableRsp
*
pRsp
=
(
SRetrieveTableRsp
*
)
data
;
ASSERT_EQ
(
pRsp
->
completed
,
1
);
ASSERT_EQ
(
pRsp
->
numOfRows
,
10
);
tfree
(
data
);
data
=
NULL
;
code
=
scheduleFetchRows
(
job
,
&
data
);
ASSERT_EQ
(
code
,
0
);
ASSERT_
EQ
(
data
,
(
void
*
)
NULL
);
ASSERT_
TRUE
(
data
);
scheduleFreeJob
(
pJob
);
...
...
@@ -383,7 +610,6 @@ TEST(queryTest, normalCase) {
TEST
(
insertTest
,
normalCase
)
{
void
*
mockPointer
=
(
void
*
)
0x1
;
char
*
clusterId
=
"cluster1"
;
...
...
@@ -427,13 +653,29 @@ TEST(insertTest, normalCase) {
}
TEST
(
multiThread
,
forceFree
)
{
pthread_attr_t
thattr
;
pthread_attr_init
(
&
thattr
);
schtInitLogFile
();
pthread_t
thread1
,
thread2
,
thread3
;
pthread_create
(
&
(
thread1
),
&
thattr
,
schtRunJobThread
,
NULL
);
pthread_create
(
&
(
thread2
),
&
thattr
,
schtFreeJobThread
,
NULL
);
pthread_create
(
&
(
thread3
),
&
thattr
,
schtFetchRspThread
,
NULL
);
while
(
true
)
{
if
(
schtTestDeadLoop
)
{
sleep
(
1
);
}
else
{
sleep
(
schtTestMTRunSec
);
break
;
}
}
schtTestStop
=
true
;
sleep
(
3
);
}
int
main
(
int
argc
,
char
**
argv
)
{
srand
(
time
(
NULL
));
testing
::
InitGoogleTest
(
&
argc
,
argv
);
return
RUN_ALL_TESTS
();
}
...
...
source/util/src/terror.c
浏览文件 @
e26710bc
...
...
@@ -360,6 +360,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_CANCELLING, "Task cancelling")
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_TASK_DROPPING
,
"Task dropping"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_DUPLICATTED_OPERATION
,
"Duplicatted operation"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_TASK_MSG_ERROR
,
"Task message error"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_JOB_FREED
,
"Job already freed"
)
...
...
tests/script/sh/massiveTable/deployCluster.sh
浏览文件 @
e26710bc
...
...
@@ -5,6 +5,49 @@
set
-e
#set -x
masterDnode
=
slave
dataRootDir
=
"/data"
firstEp
=
"trd02:7000"
startPort
=
7000
dnodeNumber
=
1
updateSrc
=
no
while
getopts
"hm:f:n:r:p:u:"
arg
do
case
$arg
in
m
)
masterDnode
=
$(
echo
$OPTARG
)
;;
n
)
dnodeNumber
=
$(
echo
$OPTARG
)
;;
u
)
updateSrc
=
$(
echo
$OPTARG
)
;;
f
)
firstEp
=
$(
echo
$OPTARG
)
;;
p
)
startPort
=
$(
echo
$OPTARG
)
;;
r
)
dataRootDir
=
$(
echo
$OPTARG
)
;;
h
)
echo
"Usage:
`
basename
$0
`
-m [if master dnode] "
echo
" -n [ dnode number] "
echo
" -f [ first ep] "
echo
" -p [ start port] "
echo
" -r [ dnode root dir] "
exit
0
;;
?
)
#unknow option
echo
"unkonw argument"
exit
1
;;
esac
done
# deployCluster.sh
curr_dir
=
$(
readlink
-f
"
$(
dirname
"
$0
"
)
"
)
echo
$curr_dir
...
...
@@ -12,13 +55,21 @@ echo $curr_dir
${
curr_dir
}
/cleanCluster.sh
-r
"/data"
${
curr_dir
}
/cleanCluster.sh
-r
"/data2"
${
curr_dir
}
/compileVersion.sh
-r
${
curr_dir
}
/../../../../
-v
"3.0"
if
[[
"
${
updateSrc
}
"
==
"yes"
]]
;
then
${
curr_dir
}
/compileVersion.sh
-r
${
curr_dir
}
/../../../../
-v
"3.0"
fi
${
curr_dir
}
/setupDnodes.sh
-r
"/data"
-n
1
-f
"trd02:7000"
-p
7000
${
curr_dir
}
/setupDnodes.sh
-r
"/data2"
-n
1
-f
"trd02:7000"
-p
8000
${
curr_dir
}
/setupDnodes.sh
-r
"/data"
-n
${
dnodeNumber
}
-f
${
firstEp
}
-p
7000
${
curr_dir
}
/setupDnodes.sh
-r
"/data2"
-n
${
dnodeNumber
}
-f
${
firstEp
}
-p
8000
#./setupDnodes.sh -r "/data" -n 2 -f trd02:7000 -p 7000
#./setupDnodes.sh -r "/data2" -n 2 -f trd02:7000 -p 8000
if
[[
"
${
masterDnode
}
"
==
"master"
]]
;
then
# create all dnode into cluster
taos
-s
"create dnode trd02 port 8000;"
taos
-s
"create dnode trd03 port 7000;"
taos
-s
"create dnode trd03 port 8000;"
taos
-s
"create dnode trd04 port 7000;"
taos
-s
"create dnode trd04 port 8000;"
fi
...
...
tests/test/c/create_table.c
浏览文件 @
e26710bc
...
...
@@ -46,9 +46,9 @@ typedef struct {
pthread_t
thread
;
}
SThreadInfo
;
//void parseArgument(int32_t argc, char *argv[]);
//void *threadFunc(void *param);
//void createDbAndStb();
//
void parseArgument(int32_t argc, char *argv[]);
//
void *threadFunc(void *param);
//
void createDbAndStb();
void
createDbAndStb
()
{
pPrint
(
"start to create db and stable"
);
...
...
@@ -64,7 +64,8 @@ void createDbAndStb() {
TAOS_RES
*
pRes
=
taos_query
(
con
,
qstr
);
int32_t
code
=
taos_errno
(
pRes
);
if
(
code
!=
0
)
{
pError
(
"failed to create database:%s, sql:%s, code:%d reason:%s"
,
dbName
,
qstr
,
taos_errno
(
pRes
),
taos_errstr
(
pRes
));
pError
(
"failed to create database:%s, sql:%s, code:%d reason:%s"
,
dbName
,
qstr
,
taos_errno
(
pRes
),
taos_errstr
(
pRes
));
exit
(
0
);
}
taos_free_result
(
pRes
);
...
...
@@ -129,10 +130,9 @@ static int64_t getResult(TAOS_RES *tres) {
return
numOfRows
;
}
void
showTables
()
{
void
showTables
()
{
pPrint
(
"start to show tables"
);
char
qstr
[
32
];
char
qstr
[
128
];
TAOS
*
con
=
taos_connect
(
NULL
,
"root"
,
"taosdata"
,
NULL
,
0
);
if
(
con
==
NULL
)
{
...
...
@@ -140,9 +140,9 @@ void showTables() {
exit
(
1
);
}
s
printf
(
qstr
,
"use %s"
,
dbName
);
s
nprintf
(
qstr
,
128
,
"use %s"
,
dbName
);
TAOS_RES
*
pRes
=
taos_query
(
con
,
qstr
);
int
code
=
taos_errno
(
pRes
);
int
code
=
taos_errno
(
pRes
);
if
(
code
!=
0
)
{
pError
(
"failed to use db, code:%d reason:%s"
,
taos_errno
(
pRes
),
taos_errstr
(
pRes
));
exit
(
1
);
...
...
@@ -160,12 +160,11 @@ void showTables() {
int64_t
totalTableNum
=
getResult
(
pRes
);
taos_free_result
(
pRes
);
pPrint
(
"%s database: %s, total %"
PRId64
" tables %s"
,
GREEN
,
dbName
,
totalTableNum
,
NC
);
pPrint
(
"%s database: %s, total %"
PRId64
" tables %s"
,
GREEN
,
dbName
,
totalTableNum
,
NC
);
taos_close
(
con
);
}
void
*
threadFunc
(
void
*
param
)
{
SThreadInfo
*
pInfo
=
(
SThreadInfo
*
)
param
;
char
*
qstr
=
malloc
(
2000
*
1000
);
...
...
@@ -177,48 +176,48 @@ void *threadFunc(void *param) {
exit
(
1
);
}
//printf("thread:%d, table range: %"PRId64 " - %"PRId64 "\n", pInfo->threadIndex, pInfo->tableBeginIndex, pInfo->tableEndIndex);
// printf("thread:%d, table range: %"PRId64 " - %"PRId64 "\n", pInfo->threadIndex, pInfo->tableBeginIndex,
// pInfo->tableEndIndex);
sprintf
(
qstr
,
"use %s"
,
pInfo
->
dbName
);
TAOS_RES
*
pRes
=
taos_query
(
con
,
qstr
);
taos_free_result
(
pRes
);
if
(
createTable
)
{
int64_t
curMs
=
0
;
int64_t
beginMs
=
taosGetTimestampMs
();
pInfo
->
startMs
=
beginMs
;
int64_t
t
=
pInfo
->
tableBeginIndex
;
int64_t
t
=
pInfo
->
tableBeginIndex
;
for
(;
t
<=
pInfo
->
tableEndIndex
;)
{
//int64_t batch = (pInfo->tableEndIndex - t);
//batch = MIN(batch, batchNum);
//
int64_t batch = (pInfo->tableEndIndex - t);
//
batch = MIN(batch, batchNum);
int32_t
len
=
sprintf
(
qstr
,
"create table"
);
for
(
int32_t
i
=
0
;
i
<
batchNum
;)
{
len
+=
sprintf
(
qstr
+
len
,
" %s_t%"
PRId64
" using %s tags(%"
PRId64
")"
,
stbName
,
t
,
stbName
,
t
);
t
++
;
i
++
;
t
++
;
i
++
;
if
(
t
>
pInfo
->
tableEndIndex
)
{
break
;
}
break
;
}
}
int64_t
startTs
=
taosGetTimestampUs
();
int64_t
startTs
=
taosGetTimestampUs
();
TAOS_RES
*
pRes
=
taos_query
(
con
,
qstr
);
code
=
taos_errno
(
pRes
);
if
(
code
!=
0
)
{
pError
(
"failed to create table t%"
PRId64
",
reason:%s"
,
t
,
tstrerror
(
code
));
if
(
(
code
!=
0
)
&&
(
code
!=
TSDB_CODE_RPC_AUTH_REQUIRED
)
)
{
pError
(
"failed to create table t%"
PRId64
",
code: %d, reason:%s"
,
t
,
code
,
tstrerror
(
code
));
}
taos_free_result
(
pRes
);
int64_t
endTs
=
taosGetTimestampUs
();
int64_t
delay
=
endTs
-
startTs
;
//
printf("==== %"PRId64" - %"PRId64", %"PRId64"\n", startTs, endTs, delay);
if
(
delay
>
pInfo
->
maxDelay
)
pInfo
->
maxDelay
=
delay
;
int64_t
endTs
=
taosGetTimestampUs
();
int64_t
delay
=
endTs
-
startTs
;
//
printf("==== %"PRId64" - %"PRId64", %"PRId64"\n", startTs, endTs, delay);
if
(
delay
>
pInfo
->
maxDelay
)
pInfo
->
maxDelay
=
delay
;
if
(
delay
<
pInfo
->
minDelay
)
pInfo
->
minDelay
=
delay
;
curMs
=
taosGetTimestampMs
();
if
(
curMs
-
beginMs
>
10000
)
{
beginMs
=
curMs
;
//
printf("==== tableBeginIndex: %"PRId64", t: %"PRId64"\n", pInfo->tableBeginIndex, t);
curMs
=
taosGetTimestampMs
();
if
(
curMs
-
beginMs
>
10000
)
{
beginMs
=
curMs
;
//
printf("==== tableBeginIndex: %"PRId64", t: %"PRId64"\n", pInfo->tableBeginIndex, t);
printCreateProgress
(
pInfo
,
t
);
}
}
...
...
@@ -227,7 +226,7 @@ void *threadFunc(void *param) {
if
(
insertData
)
{
int64_t
curMs
=
0
;
int64_t
beginMs
=
taosGetTimestampMs
();
;
int64_t
beginMs
=
taosGetTimestampMs
();
pInfo
->
startMs
=
taosGetTimestampMs
();
for
(
int64_t
t
=
pInfo
->
tableBeginIndex
;
t
<
pInfo
->
tableEndIndex
;
++
t
)
{
...
...
@@ -247,7 +246,7 @@ void *threadFunc(void *param) {
taos_free_result
(
pRes
);
curMs
=
taosGetTimestampMs
();
if
(
curMs
-
beginMs
>
10000
)
{
if
(
curMs
-
beginMs
>
10000
)
{
printInsertProgress
(
pInfo
,
t
);
}
t
+=
(
batch
-
1
);
...
...
@@ -335,33 +334,32 @@ int32_t main(int32_t argc, char *argv[]) {
parseArgument
(
argc
,
argv
);
if
(
showTablesFlag
)
{
showTables
();
return
0
;
showTables
();
return
0
;
}
createDbAndStb
();
pPrint
(
"%d threads are spawned to create %
d tables"
,
numOfThreads
,
numOfThread
s
);
pPrint
(
"%d threads are spawned to create %
"
PRId64
" tables"
,
numOfThreads
,
numOfTable
s
);
pthread_attr_t
thattr
;
pthread_attr_init
(
&
thattr
);
pthread_attr_setdetachstate
(
&
thattr
,
PTHREAD_CREATE_JOINABLE
);
SThreadInfo
*
pInfo
=
(
SThreadInfo
*
)
calloc
(
numOfThreads
,
sizeof
(
SThreadInfo
));
//int64_t numOfTablesPerThread = numOfTables / numOfThreads;
//numOfTables = numOfTablesPerThread * numOfThreads;
// int64_t numOfTablesPerThread = numOfTables / numOfThreads;
// numOfTables = numOfTablesPerThread * numOfThreads;
if
(
numOfThreads
<
1
)
{
numOfThreads
=
1
;
}
int64_t
a
=
numOfTables
/
numOfThreads
;
if
(
a
<
1
)
{
numOfThreads
=
numOfTables
;
a
=
1
;
numOfThreads
=
numOfTables
;
a
=
1
;
}
int64_t
b
=
0
;
b
=
numOfTables
%
numOfThreads
;
...
...
@@ -371,7 +369,7 @@ int32_t main(int32_t argc, char *argv[]) {
pInfo
[
i
].
tableEndIndex
=
i
<
b
?
tableFrom
+
a
:
tableFrom
+
a
-
1
;
tableFrom
=
pInfo
[
i
].
tableEndIndex
+
1
;
pInfo
[
i
].
threadIndex
=
i
;
pInfo
[
i
].
minDelay
=
INT64_MAX
;
pInfo
[
i
].
minDelay
=
INT64_MAX
;
strcpy
(
pInfo
[
i
].
dbName
,
dbName
);
strcpy
(
pInfo
[
i
].
stbName
,
stbName
);
pthread_create
(
&
(
pInfo
[
i
].
thread
),
&
thattr
,
threadFunc
,
(
void
*
)(
pInfo
+
i
));
...
...
@@ -390,7 +388,7 @@ int32_t main(int32_t argc, char *argv[]) {
createTableSpeed
+=
pInfo
[
i
].
createTableSpeed
;
if
(
pInfo
[
i
].
maxDelay
>
maxDelay
)
maxDelay
=
pInfo
[
i
].
maxDelay
;
if
(
pInfo
[
i
].
minDelay
<
minDelay
)
minDelay
=
pInfo
[
i
].
minDelay
;
if
(
pInfo
[
i
].
minDelay
<
minDelay
)
minDelay
=
pInfo
[
i
].
minDelay
;
}
float
insertDataSpeed
=
0
;
...
...
@@ -398,21 +396,15 @@ int32_t main(int32_t argc, char *argv[]) {
insertDataSpeed
+=
pInfo
[
i
].
insertDataSpeed
;
}
pPrint
(
"%s total %"
PRId64
" tables, %.1f tables/second, threads:%d, maxDelay: %"
PRId64
"us, minDelay: %"
PRId64
"us %s"
,
GREEN
,
numOfTables
,
createTableSpeed
,
numOfThreads
,
maxDelay
,
minDelay
,
NC
);
pPrint
(
"%s total %"
PRId64
" tables, %.1f tables/second, threads:%d, maxDelay: %"
PRId64
"us, minDelay: %"
PRId64
"us %s"
,
GREEN
,
numOfTables
,
createTableSpeed
,
numOfThreads
,
maxDelay
,
minDelay
,
NC
);
if
(
insertData
)
{
pPrint
(
"%s total %"
PRId64
" tables, %.1f rows/second, threads:%d %s"
,
GREEN
,
numOfTables
,
insertDataSpeed
,
numOfThreads
,
NC
);
pPrint
(
"%s total %"
PRId64
" tables, %.1f rows/second, threads:%d %s"
,
GREEN
,
numOfTables
,
insertDataSpeed
,
numOfThreads
,
NC
);
}
pthread_attr_destroy
(
&
thattr
);
free
(
pInfo
);
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录