Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
e6f05d38
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1191
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
e6f05d38
编写于
1月 22, 2022
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feature/qnode
上级
e4338377
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
409 addition
and
58 deletion
+409
-58
source/libs/qworker/inc/qworkerInt.h
source/libs/qworker/inc/qworkerInt.h
+1
-0
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+64
-18
source/libs/qworker/src/qworkerMsg.c
source/libs/qworker/src/qworkerMsg.c
+6
-1
source/libs/qworker/test/qworkerTests.cpp
source/libs/qworker/test/qworkerTests.cpp
+337
-38
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+1
-1
未找到文件。
source/libs/qworker/inc/qworkerInt.h
浏览文件 @
e6f05d38
...
...
@@ -111,6 +111,7 @@ typedef struct SQWTaskCtx {
void
*
cancelConnection
;
bool
emptyRes
;
bool
multiExec
;
int8_t
queryContinue
;
int8_t
queryInQueue
;
int32_t
rspCode
;
...
...
source/libs/qworker/src/qworker.c
浏览文件 @
e6f05d38
...
...
@@ -53,6 +53,12 @@ int32_t qwValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus) {
break
;
case
JOB_TASK_STATUS_SUCCEED
:
if
(
newStatus
!=
JOB_TASK_STATUS_CANCELLED
&&
newStatus
!=
JOB_TASK_STATUS_DROPPING
)
{
QW_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
break
;
case
JOB_TASK_STATUS_FAILED
:
case
JOB_TASK_STATUS_CANCELLING
:
if
(
newStatus
!=
JOB_TASK_STATUS_CANCELLED
)
{
...
...
@@ -249,7 +255,7 @@ int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
*
ctx
=
taosHashAcquire
(
mgmt
->
ctxHash
,
id
,
sizeof
(
id
));
if
(
NULL
==
(
*
ctx
))
{
//QW_UNLOCK(rwType, &mgmt->ctxLock);
QW_TASK_
ELOG
(
"ctx not in ctxHash, id:%s"
,
id
);
QW_TASK_
DLOG_E
(
"task ctx not exist, may be dropped"
);
QW_ERR_RET
(
TSDB_CODE_QRY_TASK_CTX_NOT_EXIST
);
}
...
...
@@ -262,7 +268,7 @@ int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
*
ctx
=
taosHashGet
(
mgmt
->
ctxHash
,
id
,
sizeof
(
id
));
if
(
NULL
==
(
*
ctx
))
{
QW_TASK_
ELOG
(
"ctx not in ctxHash, ctxHashSize:%d"
,
taosHashGetSize
(
mgmt
->
ctxHash
)
);
QW_TASK_
DLOG_E
(
"task ctx not exist, may be dropped"
);
QW_ERR_RET
(
TSDB_CODE_QRY_TASK_CTX_NOT_EXIST
);
}
...
...
@@ -548,6 +554,8 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void
return
TSDB_CODE_SUCCESS
;
}
pOutput
->
bufStatus
=
DS_BUF_EMPTY
;
QW_TASK_DLOG
(
"no res data in sink, need response later, queryEnd:%d"
,
queryEnd
);
...
...
@@ -605,10 +613,10 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
QW_LOCK
(
QW_WRITE
,
&
ctx
->
lock
);
locked
=
true
;
atomic_store_32
(
&
ctx
->
phase
,
phase
);
switch
(
phase
)
{
case
QW_PHASE_PRE_QUERY
:
{
atomic_store_8
(
&
ctx
->
phase
,
phase
);
atomic_store_8
(
&
ctx
->
taskType
,
input
->
taskType
);
if
(
QW_IS_EVENT_PROCESSED
(
ctx
,
QW_EVENT_CANCEL
)
||
QW_IS_EVENT_PROCESSED
(
ctx
,
QW_EVENT_DROP
))
{
...
...
@@ -706,6 +714,8 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
break
;
}
case
QW_PHASE_PRE_CQUERY
:
{
atomic_store_8
(
&
ctx
->
phase
,
phase
);
if
(
QW_IS_EVENT_PROCESSED
(
ctx
,
QW_EVENT_CANCEL
))
{
QW_TASK_WLOG
(
"task already cancelled, phase:%d"
,
phase
);
output
->
needStop
=
true
;
...
...
@@ -721,17 +731,33 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
}
if
(
QW_IS_EVENT_RECEIVED
(
ctx
,
QW_EVENT_DROP
))
{
QW_TASK_ELOG
(
"drop event at wrong phase, phase:%d"
,
phase
);
QW_ERR_JRET
(
qwDropTaskStatus
(
QW_FPARAMS
()));
QW_ERR_JRET
(
qwDropTaskCtx
(
QW_FPARAMS
(),
QW_WRITE
));
output
->
rspCode
=
TSDB_CODE_QRY_TASK_DROPPED
;
output
->
needStop
=
true
;
output
->
rspCode
=
TSDB_CODE_QRY_TASK_STATUS_ERROR
;
QW_ERR_JRET
(
TSDB_CODE_QRY_TASK_CANCELLED
);
QW_SET_RSP_CODE
(
ctx
,
output
->
rspCode
);
dropConnection
=
ctx
->
dropConnection
;
// Note: ctx freed, no need to unlock it
locked
=
false
;
QW_ERR_JRET
(
output
->
rspCode
);
}
else
if
(
QW_IS_EVENT_RECEIVED
(
ctx
,
QW_EVENT_CANCEL
))
{
QW_TASK_ELOG
(
"cancel event at wrong phase, phase:%d"
,
phase
);
output
->
needStop
=
true
;
output
->
rspCode
=
TSDB_CODE_QRY_TASK_STATUS_ERROR
;
QW_ERR_JRET
(
TSDB_CODE_QRY_TASK_CANCELLED
);
QW_ERR_JRET
(
qwUpdateTaskStatus
(
QW_FPARAMS
(),
JOB_TASK_STATUS_CANCELLED
));
qwFreeTask
(
QW_FPARAMS
(),
ctx
);
QW_SET_EVENT_PROCESSED
(
ctx
,
QW_EVENT_CANCEL
);
output
->
needStop
=
true
;
output
->
rspCode
=
TSDB_CODE_QRY_TASK_CANCELLED
;
QW_SET_RSP_CODE
(
ctx
,
output
->
rspCode
);
cancelConnection
=
ctx
->
cancelConnection
;
QW_ERR_JRET
(
output
->
rspCode
);
}
if
(
ctx
->
rspCode
)
{
QW_TASK_ELOG
(
"task already failed, code:%x, phase:%d"
,
ctx
->
rspCode
,
phase
);
output
->
needStop
=
true
;
...
...
@@ -874,7 +900,9 @@ _return:
QW_UPDATE_RSP_CODE
(
ctx
,
output
->
rspCode
);
}
atomic_store_32
(
&
ctx
->
phase
,
phase
);
if
(
QW_PHASE_POST_FETCH
!=
phase
)
{
atomic_store_8
(
&
ctx
->
phase
,
phase
);
}
if
(
locked
)
{
QW_UNLOCK
(
QW_WRITE
,
&
ctx
->
lock
);
...
...
@@ -1063,6 +1091,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
QW_ERR_JRET
(
qwGetTaskCtx
(
QW_FPARAMS
(),
&
ctx
));
atomic_store_8
(
&
ctx
->
queryInQueue
,
0
);
atomic_store_8
(
&
ctx
->
queryContinue
,
0
);
DataSinkHandle
sinkHandle
=
ctx
->
sinkHandle
;
...
...
@@ -1078,6 +1107,10 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
// RC WARNING
atomic_store_8
(
&
ctx
->
queryContinue
,
1
);
}
if
(
sOutput
.
queryEnd
)
{
needStop
=
true
;
}
if
(
rsp
)
{
qwBuildFetchRsp
(
rsp
,
&
sOutput
,
dataLen
);
...
...
@@ -1093,6 +1126,10 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
_return:
if
(
NULL
==
ctx
)
{
break
;
}
if
(
code
&&
QW_IS_EVENT_RECEIVED
(
ctx
,
QW_EVENT_FETCH
))
{
QW_SET_EVENT_PROCESSED
(
ctx
,
QW_EVENT_FETCH
);
qwFreeFetchRsp
(
rsp
);
...
...
@@ -1101,12 +1138,18 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
QW_TASK_DLOG
(
"fetch msg rsped, code:%x, dataLen:%d"
,
code
,
0
);
}
input
.
code
=
code
;
qwHandlePostPhaseEvents
(
QW_FPARAMS
(),
QW_PHASE_POST_CQUERY
,
&
input
,
&
output
);
QW_LOCK
(
QW_WRITE
,
&
ctx
->
lock
);
if
(
needStop
||
code
||
0
==
atomic_load_8
(
&
ctx
->
queryContinue
))
{
atomic_store_8
(
&
ctx
->
phase
,
0
);
QW_UNLOCK
(
QW_WRITE
,
&
ctx
->
lock
);
break
;
}
QW_UNLOCK
(
QW_WRITE
,
&
ctx
->
lock
);
}
while
(
true
);
needStop
=
output
.
needStop
;
code
=
output
.
rspCode
;
}
while
((
!
needStop
)
&&
(
0
==
code
)
&&
atomic_val_compare_exchange_8
(
&
ctx
->
queryContinue
,
1
,
0
));
input
.
code
=
code
;
qwHandlePostPhaseEvents
(
QW_FPARAMS
(),
QW_PHASE_POST_CQUERY
,
&
input
,
&
output
);
QW_RET
(
code
);
}
...
...
@@ -1159,7 +1202,10 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
if
(
QW_IS_QUERY_RUNNING
(
ctx
))
{
atomic_store_8
(
&
ctx
->
queryContinue
,
1
);
}
else
if
(
0
==
atomic_load_8
(
&
ctx
->
queryInQueue
))
{
QW_ERR_JRET
(
qwUpdateTaskStatus
(
QW_FPARAMS
(),
JOB_TASK_STATUS_EXECUTING
));
if
(
!
ctx
->
multiExec
)
{
QW_ERR_JRET
(
qwUpdateTaskStatus
(
QW_FPARAMS
(),
JOB_TASK_STATUS_EXECUTING
));
ctx
->
multiExec
=
true
;
}
atomic_store_8
(
&
ctx
->
queryInQueue
,
1
);
...
...
source/libs/qworker/src/qworkerMsg.c
浏览文件 @
e6f05d38
...
...
@@ -50,6 +50,7 @@ int32_t qwBuildAndSendQueryRsp(void *connection, int32_t code) {
pRsp
->
code
=
code
;
SRpcMsg
rpcRsp
=
{
.
msgType
=
TDMT_VND_QUERY_RSP
,
.
handle
=
pMsg
->
handle
,
.
ahandle
=
pMsg
->
ahandle
,
.
pCont
=
pRsp
,
...
...
@@ -68,6 +69,7 @@ int32_t qwBuildAndSendReadyRsp(void *connection, int32_t code) {
pRsp
->
code
=
code
;
SRpcMsg
rpcRsp
=
{
.
msgType
=
TDMT_VND_RES_READY_RSP
,
.
handle
=
pMsg
->
handle
,
.
ahandle
=
pMsg
->
ahandle
,
.
pCont
=
pRsp
,
...
...
@@ -98,7 +100,7 @@ int32_t qwBuildAndSendStatusRsp(SRpcMsg *pMsg, SSchedulerStatusRsp *sStatus) {
}
SRpcMsg
rpcRsp
=
{
.
msgType
=
pMsg
->
msgType
+
1
,
.
msgType
=
TDMT_VND_TASKS_STATUS_RSP
,
.
handle
=
pMsg
->
handle
,
.
ahandle
=
pMsg
->
ahandle
,
.
pCont
=
pRsp
,
...
...
@@ -121,6 +123,7 @@ int32_t qwBuildAndSendFetchRsp(void *connection, SRetrieveTableRsp *pRsp, int32_
}
SRpcMsg
rpcRsp
=
{
.
msgType
=
TDMT_VND_FETCH_RSP
,
.
handle
=
pMsg
->
handle
,
.
ahandle
=
pMsg
->
ahandle
,
.
pCont
=
pRsp
,
...
...
@@ -138,6 +141,7 @@ int32_t qwBuildAndSendCancelRsp(SRpcMsg *pMsg, int32_t code) {
pRsp
->
code
=
code
;
SRpcMsg
rpcRsp
=
{
.
msgType
=
TDMT_VND_CANCEL_TASK_RSP
,
.
handle
=
pMsg
->
handle
,
.
ahandle
=
pMsg
->
ahandle
,
.
pCont
=
pRsp
,
...
...
@@ -155,6 +159,7 @@ int32_t qwBuildAndSendDropRsp(void *connection, int32_t code) {
pRsp
->
code
=
code
;
SRpcMsg
rpcRsp
=
{
.
msgType
=
TDMT_VND_DROP_TASK_RSP
,
.
handle
=
pMsg
->
handle
,
.
ahandle
=
pMsg
->
ahandle
,
.
pCont
=
pRsp
,
...
...
source/libs/qworker/test/qworkerTests.cpp
浏览文件 @
e6f05d38
...
...
@@ -38,21 +38,25 @@
namespace
{
#define qwtTestQueryQueueSize 1000
#define qwtTestFetchQueueSize 1000
#define qwtTestMaxExecTaskUsec 2
#define qwtTestQueryQueueSize 1000000
#define qwtTestFetchQueueSize 1000000
int32_t
qwtTestMaxExecTaskUsec
=
2
;
int32_t
qwtTestReqMaxDelayUsec
=
2
;
uint64_t
qwtTestQueryId
=
0
;
bool
qwtTestEnableSleep
=
true
;
bool
qwtTestStop
=
false
;
bool
qwtTestDeadLoop
=
tru
e
;
int32_t
qwtTestMTRunSec
=
1
0
;
bool
qwtTestDeadLoop
=
fals
e
;
int32_t
qwtTestMTRunSec
=
6
0
;
int32_t
qwtTestPrintNum
=
100000
;
int32_t
qwtTestCaseIdx
=
0
;
int32_t
qwtTestCaseNum
=
4
;
bool
qwtTestCaseFinished
=
false
;
tsem_t
qwtTestQuerySem
;
tsem_t
qwtTestFetchSem
;
int32_t
qwtTestQuitThreadNum
=
0
;
int32_t
qwtTestQueryQueueRIdx
=
0
;
int32_t
qwtTestQueryQueueWIdx
=
0
;
...
...
@@ -104,6 +108,7 @@ void qwtBuildQueryReqMsg(SRpcMsg *queryRpc) {
qwtqueryMsg
.
sId
=
htobe64
(
1
);
qwtqueryMsg
.
taskId
=
htobe64
(
1
);
qwtqueryMsg
.
contentLen
=
htonl
(
100
);
queryRpc
->
msgType
=
TDMT_VND_QUERY
;
queryRpc
->
pCont
=
&
qwtqueryMsg
;
queryRpc
->
contLen
=
sizeof
(
SSubQueryMsg
)
+
100
;
}
...
...
@@ -112,6 +117,7 @@ void qwtBuildReadyReqMsg(SResReadyReq *readyMsg, SRpcMsg *readyRpc) {
readyMsg
->
sId
=
htobe64
(
1
);
readyMsg
->
queryId
=
htobe64
(
atomic_load_64
(
&
qwtTestQueryId
));
readyMsg
->
taskId
=
htobe64
(
1
);
readyRpc
->
msgType
=
TDMT_VND_RES_READY
;
readyRpc
->
pCont
=
readyMsg
;
readyRpc
->
contLen
=
sizeof
(
SResReadyReq
);
}
...
...
@@ -120,6 +126,7 @@ void qwtBuildFetchReqMsg(SResFetchReq *fetchMsg, SRpcMsg *fetchRpc) {
fetchMsg
->
sId
=
htobe64
(
1
);
fetchMsg
->
queryId
=
htobe64
(
atomic_load_64
(
&
qwtTestQueryId
));
fetchMsg
->
taskId
=
htobe64
(
1
);
fetchRpc
->
msgType
=
TDMT_VND_FETCH
;
fetchRpc
->
pCont
=
fetchMsg
;
fetchRpc
->
contLen
=
sizeof
(
SResFetchReq
);
}
...
...
@@ -128,6 +135,7 @@ void qwtBuildDropReqMsg(STaskDropReq *dropMsg, SRpcMsg *dropRpc) {
dropMsg
->
sId
=
htobe64
(
1
);
dropMsg
->
queryId
=
htobe64
(
atomic_load_64
(
&
qwtTestQueryId
));
dropMsg
->
taskId
=
htobe64
(
1
);
dropRpc
->
msgType
=
TDMT_VND_DROP_TASK
;
dropRpc
->
pCont
=
dropMsg
;
dropRpc
->
contLen
=
sizeof
(
STaskDropReq
);
}
...
...
@@ -146,7 +154,9 @@ int32_t qwtStringToPlan(const char* str, SSubplan** subplan) {
int32_t
qwtPutReqToFetchQueue
(
void
*
node
,
struct
SRpcMsg
*
pMsg
)
{
taosWLockLatch
(
&
qwtTestFetchQueueLock
);
qwtTestFetchQueue
[
qwtTestFetchQueueWIdx
++
]
=
pMsg
;
struct
SRpcMsg
*
newMsg
=
(
struct
SRpcMsg
*
)
calloc
(
1
,
sizeof
(
struct
SRpcMsg
));
memcpy
(
newMsg
,
pMsg
,
sizeof
(
struct
SRpcMsg
));
qwtTestFetchQueue
[
qwtTestFetchQueueWIdx
++
]
=
newMsg
;
if
(
qwtTestFetchQueueWIdx
>=
qwtTestFetchQueueSize
)
{
qwtTestFetchQueueWIdx
=
0
;
}
...
...
@@ -167,7 +177,9 @@ int32_t qwtPutReqToFetchQueue(void *node, struct SRpcMsg *pMsg) {
int32_t
qwtPutReqToQueue
(
void
*
node
,
struct
SRpcMsg
*
pMsg
)
{
taosWLockLatch
(
&
qwtTestQueryQueueLock
);
qwtTestQueryQueue
[
qwtTestQueryQueueWIdx
++
]
=
pMsg
;
struct
SRpcMsg
*
newMsg
=
(
struct
SRpcMsg
*
)
calloc
(
1
,
sizeof
(
struct
SRpcMsg
));
memcpy
(
newMsg
,
pMsg
,
sizeof
(
struct
SRpcMsg
));
qwtTestQueryQueue
[
qwtTestQueryQueueWIdx
++
]
=
newMsg
;
if
(
qwtTestQueryQueueWIdx
>=
qwtTestQueryQueueSize
)
{
qwtTestQueryQueueWIdx
=
0
;
}
...
...
@@ -201,6 +213,7 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) {
qwtPutReqToFetchQueue
((
void
*
)
0x1
,
&
qwtdropRpc
);
}
rpcFreeCont
(
rsp
);
break
;
}
case
TDMT_VND_RES_READY_RSP
:
{
...
...
@@ -213,6 +226,7 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) {
qwtBuildDropReqMsg
(
&
qwtdropMsg
,
&
qwtdropRpc
);
qwtPutReqToFetchQueue
((
void
*
)
0x1
,
&
qwtdropRpc
);
}
rpcFreeCont
(
rsp
);
break
;
}
case
TDMT_VND_FETCH_RSP
:
{
...
...
@@ -226,16 +240,19 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) {
qwtBuildDropReqMsg
(
&
qwtdropMsg
,
&
qwtdropRpc
);
qwtPutReqToFetchQueue
((
void
*
)
0x1
,
&
qwtdropRpc
);
rpcFreeCont
(
rsp
);
break
;
}
case
TDMT_VND_DROP_TASK
:
{
case
TDMT_VND_DROP_TASK
_RSP
:
{
STaskDropRsp
*
rsp
=
(
STaskDropRsp
*
)
pRsp
->
pCont
;
rpcFreeCont
(
rsp
);
qwtTestCaseFinished
=
true
;
break
;
}
}
return
;
}
...
...
@@ -271,16 +288,30 @@ int32_t qwtExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
*
pRes
=
NULL
;
*
useconds
=
0
;
}
else
{
if
(
qwtTestSinkQueryEnd
)
{
*
pRes
=
NULL
;
*
useconds
=
rand
()
%
10
;
return
0
;
}
endExec
=
rand
()
%
5
;
if
(
endExec
)
{
usleep
(
rand
()
%
qwtTestMaxExecTaskUsec
);
int32_t
runTime
=
0
;
if
(
qwtTestEnableSleep
&&
qwtTestMaxExecTaskUsec
>
0
)
{
runTime
=
rand
()
%
qwtTestMaxExecTaskUsec
;
}
if
(
qwtTestEnableSleep
)
{
if
(
runTime
)
{
usleep
(
runTime
);
}
}
if
(
endExec
)
{
*
pRes
=
(
SSDataBlock
*
)
calloc
(
1
,
sizeof
(
SSDataBlock
));
(
*
pRes
)
->
info
.
rows
=
rand
()
%
1000
;
}
else
{
*
pRes
=
NULL
;
usleep
(
rand
()
%
qwtTestMaxExecTaskUsec
);
*
useconds
=
rand
()
%
10
;
}
}
...
...
@@ -308,9 +339,9 @@ int32_t qwtPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* p
qwtTestSinkBlockNum
++
;
if
(
qwtTestSinkBlockNum
>=
qwtTestSinkMaxBlockNum
)
{
*
pContinue
=
true
;
}
else
{
*
pContinue
=
false
;
}
else
{
*
pContinue
=
true
;
}
taosWUnLockLatch
(
&
qwtTestSinkLock
);
...
...
@@ -653,7 +684,7 @@ void *statusThread(void *param) {
}
void
*
clientThread
(
void
*
param
)
{
void
*
qwt
clientThread
(
void
*
param
)
{
int32_t
code
=
0
;
uint32_t
n
=
0
;
void
*
mgmt
=
param
;
...
...
@@ -672,15 +703,14 @@ void *clientThread(void *param) {
usleep
(
1
);
}
if
(
qwtTestEnableSleep
)
{
usleep
(
rand
()
%
5
);
}
if
(
++
n
%
qwtTestPrintNum
==
0
)
{
printf
(
"
query
:%d
\n
"
,
n
);
printf
(
"
case run
:%d
\n
"
,
n
);
}
}
atomic_add_fetch_32
(
&
qwtTestQuitThreadNum
,
1
);
return
NULL
;
}
...
...
@@ -689,9 +719,13 @@ void *queryQueueThread(void *param) {
SRpcMsg
*
queryRpc
=
NULL
;
void
*
mgmt
=
param
;
while
(
!
qwtTestStop
)
{
while
(
true
)
{
tsem_wait
(
&
qwtTestQuerySem
);
if
(
qwtTestStop
&&
qwtTestQueryQueueNum
<=
0
)
{
break
;
}
taosWLockLatch
(
&
qwtTestQueryQueueLock
);
if
(
qwtTestQueryQueueNum
<=
0
||
qwtTestQueryQueueRIdx
==
qwtTestQueryQueueWIdx
)
{
printf
(
"query queue is empty
\n
"
);
...
...
@@ -707,6 +741,15 @@ void *queryQueueThread(void *param) {
qwtTestQueryQueueNum
--
;
taosWUnLockLatch
(
&
qwtTestQueryQueueLock
);
if
(
qwtTestEnableSleep
&&
qwtTestReqMaxDelayUsec
>
0
)
{
int32_t
delay
=
rand
()
%
qwtTestReqMaxDelayUsec
;
if
(
delay
)
{
usleep
(
delay
);
}
}
if
(
TDMT_VND_QUERY
==
queryRpc
->
msgType
)
{
qWorkerProcessQueryMsg
(
mockPointer
,
mgmt
,
queryRpc
);
}
else
if
(
TDMT_VND_QUERY_CONTINUE
==
queryRpc
->
msgType
)
{
...
...
@@ -715,8 +758,16 @@ void *queryQueueThread(void *param) {
printf
(
"unknown msg in query queue, type:%d
\n
"
,
queryRpc
->
msgType
);
assert
(
0
);
}
free
(
queryRpc
);
if
(
qwtTestStop
&&
qwtTestQueryQueueNum
<=
0
)
{
break
;
}
}
atomic_add_fetch_32
(
&
qwtTestQuitThreadNum
,
1
);
return
NULL
;
}
...
...
@@ -725,7 +776,7 @@ void *fetchQueueThread(void *param) {
SRpcMsg
*
fetchRpc
=
NULL
;
void
*
mgmt
=
param
;
while
(
!
qwtTestStop
)
{
while
(
true
)
{
tsem_wait
(
&
qwtTestFetchSem
);
taosWLockLatch
(
&
qwtTestFetchQueueLock
);
...
...
@@ -743,23 +794,45 @@ void *fetchQueueThread(void *param) {
qwtTestFetchQueueNum
--
;
taosWUnLockLatch
(
&
qwtTestFetchQueueLock
);
if
(
qwtTestEnableSleep
&&
qwtTestReqMaxDelayUsec
>
0
)
{
int32_t
delay
=
rand
()
%
qwtTestReqMaxDelayUsec
;
if
(
delay
)
{
usleep
(
delay
);
}
}
switch
(
fetchRpc
->
msgType
)
{
case
TDMT_VND_FETCH
:
qWorkerProcessFetchMsg
(
mockPointer
,
mgmt
,
fetchRpc
);
break
;
case
TDMT_VND_RES_READY
:
qWorkerProcessReadyMsg
(
mockPointer
,
mgmt
,
fetchRpc
);
break
;
case
TDMT_VND_TASKS_STATUS
:
qWorkerProcessStatusMsg
(
mockPointer
,
mgmt
,
fetchRpc
);
break
;
case
TDMT_VND_CANCEL_TASK
:
qWorkerProcessCancelMsg
(
mockPointer
,
mgmt
,
fetchRpc
);
break
;
case
TDMT_VND_DROP_TASK
:
qWorkerProcessDropMsg
(
mockPointer
,
mgmt
,
fetchRpc
);
break
;
default:
printf
(
"unknown msg type:%d in fetch queue"
,
fetchRpc
->
msgType
);
assert
(
0
);
break
;
}
free
(
fetchRpc
);
if
(
qwtTestStop
&&
qwtTestFetchQueueNum
<=
0
)
{
break
;
}
}
atomic_add_fetch_32
(
&
qwtTestQuitThreadNum
,
1
);
return
NULL
;
}
...
...
@@ -767,6 +840,7 @@ void *fetchQueueThread(void *param) {
}
#if 0
TEST(seqTest, normalCase) {
void *mgmt = NULL;
...
...
@@ -800,31 +874,15 @@ TEST(seqTest, normalCase) {
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue);
ASSERT_EQ(code, 0);
qwtBuildStatusReqMsg
(
&
qwtstatusMsg
,
&
statusRpc
);
code
=
qWorkerProcessStatusMsg
(
mockPointer
,
mgmt
,
&
statusRpc
);
ASSERT_EQ
(
code
,
0
);
code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);
ASSERT_EQ(code, 0);
qwtBuildStatusReqMsg
(
&
qwtstatusMsg
,
&
statusRpc
);
code
=
qWorkerProcessStatusMsg
(
mockPointer
,
mgmt
,
&
statusRpc
);
ASSERT_EQ
(
code
,
0
);
code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc);
ASSERT_EQ(code, 0);
qwtBuildStatusReqMsg
(
&
qwtstatusMsg
,
&
statusRpc
);
code
=
qWorkerProcessStatusMsg
(
mockPointer
,
mgmt
,
&
statusRpc
);
ASSERT_EQ
(
code
,
0
);
code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc);
ASSERT_EQ(code, 0);
qwtBuildStatusReqMsg
(
&
qwtstatusMsg
,
&
statusRpc
);
code
=
qWorkerProcessStatusMsg
(
mockPointer
,
mgmt
,
&
statusRpc
);
ASSERT_EQ
(
code
,
0
);
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc);
ASSERT_EQ(code, 0);
...
...
@@ -914,19 +972,31 @@ TEST(seqTest, randCase) {
printf("Ready,%d\n", t++);
qwtBuildReadyReqMsg(&readyMsg, &readyRpc);
code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc);
if (qwtTestEnableSleep) {
usleep(1);
}
} else if (r >= maxr * 2/5 && r < maxr* 3/5) {
printf("Fetch,%d\n", t++);
qwtBuildFetchReqMsg(&fetchMsg, &fetchRpc);
code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc);
if (qwtTestEnableSleep) {
usleep(1);
}
} else if (r >= maxr * 3/5 && r < maxr * 4/5) {
printf("Drop,%d\n", t++);
qwtBuildDropReqMsg(&dropMsg, &dropRpc);
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc);
if (qwtTestEnableSleep) {
usleep(1);
}
} else if (r >= maxr * 4/5 && r < maxr-1) {
printf("Status,%d\n", t++);
qwtBuildStatusReqMsg(&statusMsg, &statusRpc);
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
ASSERT_EQ(code, 0);
if (qwtTestEnableSleep) {
usleep(1);
}
} else {
printf("QUIT RAND NOW");
break;
...
...
@@ -976,7 +1046,236 @@ TEST(seqTest, multithreadRand) {
qWorkerDestroy(&mgmt);
}
TEST
(
rcTest
,
multithread
)
{
#endif
TEST
(
rcTest
,
shortExecshortDelay
)
{
void
*
mgmt
=
NULL
;
int32_t
code
=
0
;
void
*
mockPointer
=
(
void
*
)
0x1
;
qwtInitLogFile
();
stubSetStringToPlan
();
stubSetRpcSendResponse
();
stubSetExecTask
();
stubSetCreateExecTask
();
stubSetAsyncKillTask
();
stubSetDestroyTask
();
stubSetDestroyDataSinker
();
stubSetGetDataLength
();
stubSetEndPut
();
stubSetPutDataBlock
();
stubSetGetDataBlock
();
srand
(
time
(
NULL
));
qwtTestStop
=
false
;
qwtTestQuitThreadNum
=
0
;
code
=
qWorkerInit
(
NODE_TYPE_VNODE
,
1
,
NULL
,
&
mgmt
,
mockPointer
,
qwtPutReqToQueue
);
ASSERT_EQ
(
code
,
0
);
qwtTestMaxExecTaskUsec
=
0
;
qwtTestReqMaxDelayUsec
=
0
;
tsem_init
(
&
qwtTestQuerySem
,
0
,
0
);
tsem_init
(
&
qwtTestFetchSem
,
0
,
0
);
pthread_attr_t
thattr
;
pthread_attr_init
(
&
thattr
);
pthread_t
t1
,
t2
,
t3
,
t4
,
t5
;
pthread_create
(
&
(
t1
),
&
thattr
,
qwtclientThread
,
mgmt
);
pthread_create
(
&
(
t2
),
&
thattr
,
queryQueueThread
,
mgmt
);
pthread_create
(
&
(
t3
),
&
thattr
,
fetchQueueThread
,
mgmt
);
while
(
true
)
{
if
(
qwtTestDeadLoop
)
{
sleep
(
1
);
}
else
{
sleep
(
qwtTestMTRunSec
);
break
;
}
}
qwtTestStop
=
true
;
while
(
true
)
{
if
(
qwtTestQuitThreadNum
==
3
)
{
break
;
}
sleep
(
3
);
tsem_post
(
&
qwtTestQuerySem
);
usleep
(
10
);
}
qwtTestQueryQueueNum
=
0
;
qwtTestQueryQueueRIdx
=
0
;
qwtTestQueryQueueWIdx
=
0
;
qwtTestQueryQueueLock
=
0
;
qwtTestFetchQueueNum
=
0
;
qwtTestFetchQueueRIdx
=
0
;
qwtTestFetchQueueWIdx
=
0
;
qwtTestFetchQueueLock
=
0
;
qWorkerDestroy
(
&
mgmt
);
}
TEST
(
rcTest
,
longExecshortDelay
)
{
void
*
mgmt
=
NULL
;
int32_t
code
=
0
;
void
*
mockPointer
=
(
void
*
)
0x1
;
qwtInitLogFile
();
stubSetStringToPlan
();
stubSetRpcSendResponse
();
stubSetExecTask
();
stubSetCreateExecTask
();
stubSetAsyncKillTask
();
stubSetDestroyTask
();
stubSetDestroyDataSinker
();
stubSetGetDataLength
();
stubSetEndPut
();
stubSetPutDataBlock
();
stubSetGetDataBlock
();
srand
(
time
(
NULL
));
qwtTestStop
=
false
;
qwtTestQuitThreadNum
=
0
;
code
=
qWorkerInit
(
NODE_TYPE_VNODE
,
1
,
NULL
,
&
mgmt
,
mockPointer
,
qwtPutReqToQueue
);
ASSERT_EQ
(
code
,
0
);
qwtTestMaxExecTaskUsec
=
1000000
;
qwtTestReqMaxDelayUsec
=
0
;
tsem_init
(
&
qwtTestQuerySem
,
0
,
0
);
tsem_init
(
&
qwtTestFetchSem
,
0
,
0
);
pthread_attr_t
thattr
;
pthread_attr_init
(
&
thattr
);
pthread_t
t1
,
t2
,
t3
,
t4
,
t5
;
pthread_create
(
&
(
t1
),
&
thattr
,
qwtclientThread
,
mgmt
);
pthread_create
(
&
(
t2
),
&
thattr
,
queryQueueThread
,
mgmt
);
pthread_create
(
&
(
t3
),
&
thattr
,
fetchQueueThread
,
mgmt
);
while
(
true
)
{
if
(
qwtTestDeadLoop
)
{
sleep
(
1
);
}
else
{
sleep
(
qwtTestMTRunSec
);
break
;
}
}
qwtTestStop
=
true
;
while
(
true
)
{
if
(
qwtTestQuitThreadNum
==
3
)
{
break
;
}
sleep
(
3
);
tsem_post
(
&
qwtTestQuerySem
);
usleep
(
10
);
}
qwtTestQueryQueueNum
=
0
;
qwtTestQueryQueueRIdx
=
0
;
qwtTestQueryQueueWIdx
=
0
;
qwtTestQueryQueueLock
=
0
;
qwtTestFetchQueueNum
=
0
;
qwtTestFetchQueueRIdx
=
0
;
qwtTestFetchQueueWIdx
=
0
;
qwtTestFetchQueueLock
=
0
;
qWorkerDestroy
(
&
mgmt
);
}
TEST
(
rcTest
,
shortExeclongDelay
)
{
void
*
mgmt
=
NULL
;
int32_t
code
=
0
;
void
*
mockPointer
=
(
void
*
)
0x1
;
qwtInitLogFile
();
stubSetStringToPlan
();
stubSetRpcSendResponse
();
stubSetExecTask
();
stubSetCreateExecTask
();
stubSetAsyncKillTask
();
stubSetDestroyTask
();
stubSetDestroyDataSinker
();
stubSetGetDataLength
();
stubSetEndPut
();
stubSetPutDataBlock
();
stubSetGetDataBlock
();
srand
(
time
(
NULL
));
qwtTestStop
=
false
;
qwtTestQuitThreadNum
=
0
;
code
=
qWorkerInit
(
NODE_TYPE_VNODE
,
1
,
NULL
,
&
mgmt
,
mockPointer
,
qwtPutReqToQueue
);
ASSERT_EQ
(
code
,
0
);
qwtTestMaxExecTaskUsec
=
0
;
qwtTestReqMaxDelayUsec
=
1000000
;
tsem_init
(
&
qwtTestQuerySem
,
0
,
0
);
tsem_init
(
&
qwtTestFetchSem
,
0
,
0
);
pthread_attr_t
thattr
;
pthread_attr_init
(
&
thattr
);
pthread_t
t1
,
t2
,
t3
,
t4
,
t5
;
pthread_create
(
&
(
t1
),
&
thattr
,
qwtclientThread
,
mgmt
);
pthread_create
(
&
(
t2
),
&
thattr
,
queryQueueThread
,
mgmt
);
pthread_create
(
&
(
t3
),
&
thattr
,
fetchQueueThread
,
mgmt
);
while
(
true
)
{
if
(
qwtTestDeadLoop
)
{
sleep
(
1
);
}
else
{
sleep
(
qwtTestMTRunSec
);
break
;
}
}
qwtTestStop
=
true
;
while
(
true
)
{
if
(
qwtTestQuitThreadNum
==
3
)
{
break
;
}
sleep
(
3
);
tsem_post
(
&
qwtTestQuerySem
);
usleep
(
10
);
}
qwtTestQueryQueueNum
=
0
;
qwtTestQueryQueueRIdx
=
0
;
qwtTestQueryQueueWIdx
=
0
;
qwtTestQueryQueueLock
=
0
;
qwtTestFetchQueueNum
=
0
;
qwtTestFetchQueueRIdx
=
0
;
qwtTestFetchQueueWIdx
=
0
;
qwtTestFetchQueueLock
=
0
;
qWorkerDestroy
(
&
mgmt
);
}
#if 0
TEST(rcTest, dropTest) {
void *mgmt = NULL;
int32_t code = 0;
void *mockPointer = (void *)0x1;
...
...
@@ -1025,7 +1324,7 @@ TEST(rcTest, multithread) {
qWorkerDestroy(&mgmt);
}
#endif
int
main
(
int
argc
,
char
**
argv
)
{
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
e6f05d38
...
...
@@ -891,7 +891,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
break
;
}
case
TDMT_VND_DROP_TASK
:
{
case
TDMT_VND_DROP_TASK
_RSP
:
{
// SHOULD NEVER REACH HERE
SCH_TASK_ELOG
(
"invalid status to handle drop task rsp, ref:%d"
,
atomic_load_32
(
&
pJob
->
ref
));
SCH_ERR_JRET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录