Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
e2d8dfaf
T
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1191
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
e2d8dfaf
编写于
1月 18, 2022
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feature/qnode
上级
0916126c
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
359 addition
and
23 deletion
+359
-23
include/libs/scheduler/scheduler.h
include/libs/scheduler/scheduler.h
+8
-0
include/util/taoserror.h
include/util/taoserror.h
+1
-0
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+104
-19
source/libs/scheduler/test/schedulerTests.cpp
source/libs/scheduler/test/schedulerTests.cpp
+245
-4
source/util/src/terror.c
source/util/src/terror.c
+1
-0
未找到文件。
include/libs/scheduler/scheduler.h
浏览文件 @
e2d8dfaf
...
...
@@ -114,6 +114,14 @@ void schedulerDestroy(void);
*/
int32_t
schedulerConvertDagToTaskList
(
SQueryDag
*
pDag
,
SArray
**
pTasks
);
/**
* make one task info's multiple copies
* @param src
* @param dst SArray**<STaskInfo>
* @return
*/
int32_t
schedulerCopyTask
(
STaskInfo
*
src
,
SArray
**
dst
,
int32_t
copyNum
);
void
schedulerFreeTaskList
(
SArray
*
taskList
);
...
...
include/util/taoserror.h
浏览文件 @
e2d8dfaf
...
...
@@ -361,6 +361,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_QRY_TASK_DROPPING TAOS_DEF_ERROR_CODE(0, 0x0717) //"Task dropping")
#define TSDB_CODE_QRY_DUPLICATTED_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0718) //"Duplicatted operation")
#define TSDB_CODE_QRY_TASK_MSG_ERROR TAOS_DEF_ERROR_CODE(0, 0x0719) //"Task message error")
#define TSDB_CODE_QRY_JOB_FREED TAOS_DEF_ERROR_CODE(0, 0x071A) //"Job freed")
// grant
#define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800) //"License expired")
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
e2d8dfaf
...
...
@@ -20,12 +20,38 @@
static
SSchedulerMgmt
schMgmt
=
{
0
};
uint64_t
schGenTaskId
(
void
)
{
return
atomic_add_fetch_64
(
&
schMgmt
.
taskId
,
1
);
}
uint64_t
schGenUUID
(
void
)
{
static
uint64_t
hashId
=
0
;
static
int32_t
requestSerialId
=
0
;
if
(
hashId
==
0
)
{
char
uid
[
64
];
int32_t
code
=
taosGetSystemUUID
(
uid
,
tListLen
(
uid
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"Failed to get the system uid, reason:%s"
,
tstrerror
(
TAOS_SYSTEM_ERROR
(
errno
)));
}
else
{
hashId
=
MurmurHash3_32
(
uid
,
strlen
(
uid
));
}
}
int64_t
ts
=
taosGetTimestampMs
();
uint64_t
pid
=
taosGetPId
();
int32_t
val
=
atomic_add_fetch_32
(
&
requestSerialId
,
1
);
uint64_t
id
=
((
hashId
&
0x0FFF
)
<<
52
)
|
((
pid
&
0x0FFF
)
<<
40
)
|
((
ts
&
0xFFFFFF
)
<<
16
)
|
(
val
&
0xFFFF
);
return
id
;
}
int32_t
schInitTask
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SSubplan
*
pPlan
,
SSchLevel
*
pLevel
)
{
pTask
->
plan
=
pPlan
;
pTask
->
level
=
pLevel
;
SCH_SET_TASK_STATUS
(
pTask
,
JOB_TASK_STATUS_NOT_START
);
pTask
->
taskId
=
atomic_add_fetch_64
(
&
schMgmt
.
taskId
,
1
);
pTask
->
taskId
=
schGenTaskId
(
);
pTask
->
execAddrs
=
taosArrayInit
(
SCH_MAX_CONDIDATE_EP_NUM
,
sizeof
(
SQueryNodeAddr
));
if
(
NULL
==
pTask
->
execAddrs
)
{
SCH_TASK_ELOG
(
"taosArrayInit %d exec addrs failed"
,
SCH_MAX_CONDIDATE_EP_NUM
);
...
...
@@ -41,7 +67,7 @@ void schFreeTask(SSchTask* pTask) {
}
// TODO NEED TO VERFY WITH ASYNC_SEND MEMORY FREE
//
tfree(pTask->msg);
tfree
(
pTask
->
msg
);
if
(
pTask
->
children
)
{
taosArrayDestroy
(
pTask
->
children
);
...
...
@@ -141,7 +167,7 @@ int32_t schCheckAndUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
break
;
case
JOB_TASK_STATUS_CANCELLED
:
case
JOB_TASK_STATUS_DROPPING
:
SCH_ERR_JRET
(
TSDB_CODE_QRY_
APP_ERROR
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_
JOB_FREED
);
break
;
default:
...
...
@@ -541,12 +567,9 @@ int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, b
return
TSDB_CODE_SUCCESS
;
}
// Note: no more error processing, handled in function internal
int32_t
schProcessOnJobFailure
(
SSchJob
*
pJob
,
int32_t
errCode
)
{
int32_t
schProcessOnJobFailureImpl
(
SSchJob
*
pJob
,
int32_t
status
,
int32_t
errCode
)
{
// if already FAILED, no more processing
SCH_ERR_RET
(
schCheckAndUpdateJobStatus
(
pJob
,
JOB_TASK_STATUS_FAILED
));
SCH_ERR_RET
(
schCheckAndUpdateJobStatus
(
pJob
,
status
));
if
(
errCode
)
{
atomic_store_32
(
&
pJob
->
errCode
,
errCode
);
...
...
@@ -563,6 +586,17 @@ int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) {
// Note: no more error processing, handled in function internal
int32_t
schProcessOnJobFailure
(
SSchJob
*
pJob
,
int32_t
errCode
)
{
SCH_RET
(
schProcessOnJobFailureImpl
(
pJob
,
JOB_TASK_STATUS_FAILED
,
errCode
));
}
// Note: no more error processing, handled in function internal
int32_t
schProcessOnJobDropped
(
SSchJob
*
pJob
,
int32_t
errCode
)
{
SCH_RET
(
schProcessOnJobFailureImpl
(
pJob
,
JOB_TASK_STATUS_DROPPING
,
errCode
));
}
// Note: no more error processing, handled in function internal
int32_t
schFetchFromRemote
(
SSchJob
*
pJob
)
{
int32_t
code
=
0
;
...
...
@@ -871,7 +905,7 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in
SSchJob
**
job
=
taosHashGet
(
schMgmt
.
jobs
,
&
pParam
->
queryId
,
sizeof
(
pParam
->
queryId
));
if
(
NULL
==
job
||
NULL
==
(
*
job
))
{
qError
(
"QID:%"
PRIx64
" taosHashGet queryId not exist, may be dropped"
,
pParam
->
queryId
);
SCH_ERR_JRET
(
TSDB_CODE_
SCH_INTERNAL_ERROR
);
SCH_ERR_JRET
(
TSDB_CODE_
QRY_JOB_FREED
);
}
pJob
=
*
job
;
...
...
@@ -880,13 +914,13 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in
int32_t
s
=
taosHashGetSize
(
pJob
->
execTasks
);
if
(
s
<=
0
)
{
qError
(
"QID:%"
PRIx64
",TID:%"
PRI
x
64
" no task in execTask list"
,
pParam
->
queryId
,
pParam
->
taskId
);
qError
(
"QID:%"
PRIx64
",TID:%"
PRI
d
64
" no task in execTask list"
,
pParam
->
queryId
,
pParam
->
taskId
);
SCH_ERR_JRET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
SSchTask
**
task
=
taosHashGet
(
pJob
->
execTasks
,
&
pParam
->
taskId
,
sizeof
(
pParam
->
taskId
));
if
(
NULL
==
task
||
NULL
==
(
*
task
))
{
qError
(
"QID:%"
PRIx64
",TID:%"
PRI
x
64
" taosHashGet taskId not exist"
,
pParam
->
queryId
,
pParam
->
taskId
);
qError
(
"QID:%"
PRIx64
",TID:%"
PRI
d
64
" taosHashGet taskId not exist"
,
pParam
->
queryId
,
pParam
->
taskId
);
SCH_ERR_JRET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
...
...
@@ -1434,7 +1468,7 @@ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) {
pMsg
->
sId
=
htobe64
(
schMgmt
.
sId
);
pMsg
->
queryId
=
htobe64
(
plan
->
id
.
queryId
);
pMsg
->
taskId
=
htobe64
(
atomic_add_fetch_64
(
&
schMgmt
.
taskId
,
1
));
pMsg
->
taskId
=
htobe64
(
schGenUUID
(
));
pMsg
->
contentLen
=
htonl
(
msgLen
);
memcpy
(
pMsg
->
msg
,
msg
,
msgLen
);
...
...
@@ -1457,6 +1491,52 @@ _return:
SCH_RET
(
code
);
}
int32_t
schedulerCopyTask
(
STaskInfo
*
src
,
SArray
**
dst
,
int32_t
copyNum
)
{
if
(
NULL
==
src
||
NULL
==
dst
||
copyNum
<=
0
)
{
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
int32_t
code
=
0
;
*
dst
=
taosArrayInit
(
copyNum
,
sizeof
(
STaskInfo
));
if
(
NULL
==
*
dst
)
{
qError
(
"taosArrayInit %d taskInfo failed"
,
copyNum
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
int32_t
msgSize
=
src
->
msg
->
contentLen
+
sizeof
(
*
src
->
msg
);
STaskInfo
info
=
{
0
};
info
.
addr
=
src
->
addr
;
for
(
int32_t
i
=
0
;
i
<
copyNum
;
++
i
)
{
info
.
msg
=
malloc
(
msgSize
);
if
(
NULL
==
info
.
msg
)
{
qError
(
"malloc %d failed"
,
msgSize
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
memcpy
(
info
.
msg
,
src
->
msg
,
msgSize
);
info
.
msg
->
taskId
=
schGenUUID
();
if
(
NULL
==
taosArrayPush
(
*
dst
,
&
info
))
{
qError
(
"taosArrayPush failed, idx:%d"
,
i
);
free
(
info
.
msg
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
return
TSDB_CODE_SUCCESS
;
_return:
schedulerFreeTaskList
(
*
dst
);
*
dst
=
NULL
;
SCH_RET
(
code
);
}
int32_t
scheduleFetchRows
(
SSchJob
*
pJob
,
void
**
pData
)
{
if
(
NULL
==
pJob
||
NULL
==
pData
)
{
...
...
@@ -1464,14 +1544,15 @@ int32_t scheduleFetchRows(SSchJob *pJob, void** pData) {
}
int32_t
code
=
0
;
atomic_add_fetch_32
(
&
pJob
->
ref
,
1
);
int8_t
status
=
SCH_GET_JOB_STATUS
(
pJob
);
if
(
status
==
JOB_TASK_STATUS_DROPPING
)
{
SCH_JOB_ELOG
(
"job is dropping, status:%d"
,
status
);
return
TSDB_CODE_SCH_STATUS_ERROR
;
atomic_sub_fetch_32
(
&
pJob
->
ref
,
1
);
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
atomic_add_fetch_32
(
&
pJob
->
ref
,
1
);
if
(
!
SCH_JOB_NEED_FETCH
(
&
pJob
->
attr
))
{
SCH_JOB_ELOG
(
"no need to fetch data, status:%d"
,
SCH_GET_JOB_STATUS
(
pJob
));
atomic_sub_fetch_32
(
&
pJob
->
ref
,
1
);
...
...
@@ -1484,7 +1565,7 @@ int32_t scheduleFetchRows(SSchJob *pJob, void** pData) {
SCH_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
}
if
(
status
==
JOB_TASK_STATUS_FAILED
)
{
if
(
JOB_TASK_STATUS_FAILED
==
status
||
JOB_TASK_STATUS_DROPPING
==
status
)
{
*
pData
=
atomic_load_ptr
(
&
pJob
->
res
);
atomic_store_ptr
(
&
pJob
->
res
,
NULL
);
SCH_ERR_JRET
(
atomic_load_32
(
&
pJob
->
errCode
));
...
...
@@ -1500,7 +1581,7 @@ int32_t scheduleFetchRows(SSchJob *pJob, void** pData) {
status
=
SCH_GET_JOB_STATUS
(
pJob
);
if
(
status
==
JOB_TASK_STATUS_FAILED
)
{
if
(
JOB_TASK_STATUS_FAILED
==
status
||
JOB_TASK_STATUS_DROPPING
==
status
)
{
code
=
atomic_load_32
(
&
pJob
->
errCode
);
SCH_ERR_JRET
(
code
);
}
...
...
@@ -1547,6 +1628,7 @@ void scheduleFreeJob(void *job) {
SSchJob
*
pJob
=
job
;
uint64_t
queryId
=
pJob
->
queryId
;
bool
setJobFree
=
false
;
if
(
SCH_GET_JOB_STATUS
(
pJob
)
>
0
)
{
if
(
0
!=
taosHashRemove
(
schMgmt
.
jobs
,
&
pJob
->
queryId
,
sizeof
(
pJob
->
queryId
)))
{
...
...
@@ -1554,8 +1636,6 @@ void scheduleFreeJob(void *job) {
return
;
}
schCheckAndUpdateJobStatus
(
pJob
,
JOB_TASK_STATUS_DROPPING
);
SCH_JOB_DLOG
(
"job removed from list, no further ref, ref:%d"
,
atomic_load_32
(
&
pJob
->
ref
));
while
(
true
)
{
...
...
@@ -1563,6 +1643,11 @@ void scheduleFreeJob(void *job) {
if
(
0
==
ref
)
{
break
;
}
else
if
(
ref
>
0
)
{
if
(
1
==
ref
&&
atomic_load_8
(
&
pJob
->
userFetch
)
>
0
&&
!
setJobFree
)
{
schProcessOnJobDropped
(
pJob
,
TSDB_CODE_QRY_JOB_FREED
);
setJobFree
=
true
;
}
usleep
(
1
);
}
else
{
assert
(
0
);
...
...
source/libs/scheduler/test/schedulerTests.cpp
浏览文件 @
e2d8dfaf
...
...
@@ -38,6 +38,20 @@
namespace
{
extern
"C"
int32_t
schHandleResponseMsg
(
SSchJob
*
job
,
SSchTask
*
task
,
int32_t
msgType
,
char
*
msg
,
int32_t
msgSize
,
int32_t
rspCode
);
extern
"C"
int32_t
schHandleCallback
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
msgType
,
int32_t
rspCode
);
struct
SSchJob
*
pInsertJob
=
NULL
;
struct
SSchJob
*
pQueryJob
=
NULL
;
uint64_t
schtMergeTemplateId
=
0x4
;
uint64_t
schtFetchTaskId
=
0
;
uint64_t
schtQueryId
=
1
;
bool
schtTestStop
=
false
;
bool
schtTestDeadLoop
=
false
;
int32_t
schtTestMTRunSec
=
60
;
int32_t
schtTestPrintNum
=
1000
;
int32_t
schtStartFetch
=
0
;
void
schtInitLogFile
()
{
...
...
@@ -57,7 +71,7 @@ void schtInitLogFile() {
void
schtBuildQueryDag
(
SQueryDag
*
dag
)
{
uint64_t
qId
=
0x0000000000000001
;
uint64_t
qId
=
schtQueryId
;
dag
->
queryId
=
qId
;
dag
->
numOfSubplans
=
2
;
...
...
@@ -84,7 +98,7 @@ void schtBuildQueryDag(SQueryDag *dag) {
scanPlan
->
msgType
=
TDMT_VND_QUERY
;
mergePlan
->
id
.
queryId
=
qId
;
mergePlan
->
id
.
templateId
=
0x4444444444
;
mergePlan
->
id
.
templateId
=
schtMergeTemplateId
;
mergePlan
->
id
.
subplanId
=
0x5555555555
;
mergePlan
->
type
=
QUERY_TYPE_MERGE
;
mergePlan
->
level
=
0
;
...
...
@@ -269,15 +283,225 @@ void *schtCreateFetchRspThread(void *param) {
SRetrieveTableRsp
*
rsp
=
(
SRetrieveTableRsp
*
)
calloc
(
1
,
sizeof
(
SRetrieveTableRsp
));
rsp
->
completed
=
1
;
rsp
->
numOfRows
=
10
;
code
=
schHandleResponseMsg
(
job
,
job
->
fetchTask
,
TDMT_VND_FETCH_RSP
,
(
char
*
)
rsp
,
sizeof
(
rsp
),
0
);
assert
(
code
==
0
);
}
void
*
schtFetchRspThread
(
void
*
aa
)
{
SDataBuf
dataBuf
=
{
0
};
SSchCallbackParam
*
param
=
NULL
;
while
(
!
schtTestStop
)
{
if
(
0
==
atomic_val_compare_exchange_32
(
&
schtStartFetch
,
1
,
0
))
{
continue
;
}
usleep
(
1
);
param
=
(
SSchCallbackParam
*
)
calloc
(
1
,
sizeof
(
*
param
));
param
->
queryId
=
schtQueryId
;
param
->
taskId
=
schtFetchTaskId
;
int32_t
code
=
0
;
SRetrieveTableRsp
*
rsp
=
(
SRetrieveTableRsp
*
)
calloc
(
1
,
sizeof
(
SRetrieveTableRsp
));
rsp
->
completed
=
1
;
rsp
->
numOfRows
=
10
;
dataBuf
.
pData
=
rsp
;
dataBuf
.
len
=
sizeof
(
*
rsp
);
code
=
schHandleCallback
(
param
,
&
dataBuf
,
TDMT_VND_FETCH_RSP
,
0
);
assert
(
code
==
0
||
code
);
}
}
void
schtFreeQueryJob
(
int32_t
freeThread
)
{
static
uint32_t
freeNum
=
0
;
SSchJob
*
job
=
atomic_load_ptr
(
&
pQueryJob
);
if
(
job
&&
atomic_val_compare_exchange_ptr
(
&
pQueryJob
,
job
,
NULL
))
{
scheduleFreeJob
(
job
);
if
(
freeThread
)
{
if
(
++
freeNum
%
schtTestPrintNum
==
0
)
{
printf
(
"FreeNum:%d
\n
"
,
freeNum
);
}
}
}
}
void
*
schtRunJobThread
(
void
*
aa
)
{
void
*
mockPointer
=
(
void
*
)
0x1
;
char
*
clusterId
=
"cluster1"
;
char
*
dbname
=
"1.db1"
;
char
*
tablename
=
"table1"
;
SVgroupInfo
vgInfo
=
{
0
};
SQueryDag
dag
=
{
0
};
schtInitLogFile
();
SArray
*
qnodeList
=
taosArrayInit
(
1
,
sizeof
(
SEpAddr
));
SEpAddr
qnodeAddr
=
{
0
};
strcpy
(
qnodeAddr
.
fqdn
,
"qnode0.ep"
);
qnodeAddr
.
port
=
6031
;
taosArrayPush
(
qnodeList
,
&
qnodeAddr
);
int32_t
code
=
schedulerInit
(
NULL
);
assert
(
code
==
0
);
schtSetPlanToString
();
schtSetExecNode
();
schtSetAsyncSendMsgToServer
();
SSchJob
*
job
=
NULL
;
SSchCallbackParam
*
param
=
NULL
;
SHashObj
*
execTasks
=
NULL
;
SDataBuf
dataBuf
=
{
0
};
uint32_t
jobFinished
=
0
;
while
(
!
schtTestStop
)
{
schtBuildQueryDag
(
&
dag
);
code
=
scheduleAsyncExecJob
(
mockPointer
,
qnodeList
,
&
dag
,
&
job
);
assert
(
code
==
0
);
execTasks
=
taosHashInit
(
5
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_UBIGINT
),
false
,
HASH_ENTRY_LOCK
);
void
*
pIter
=
taosHashIterate
(
job
->
execTasks
,
NULL
);
while
(
pIter
)
{
SSchTask
*
task
=
*
(
SSchTask
**
)
pIter
;
schtFetchTaskId
=
task
->
taskId
-
1
;
taosHashPut
(
execTasks
,
&
task
->
taskId
,
sizeof
(
task
->
taskId
),
task
,
sizeof
(
*
task
));
pIter
=
taosHashIterate
(
job
->
execTasks
,
pIter
);
}
param
=
(
SSchCallbackParam
*
)
calloc
(
1
,
sizeof
(
*
param
));
param
->
queryId
=
schtQueryId
;
pQueryJob
=
job
;
pIter
=
taosHashIterate
(
execTasks
,
NULL
);
while
(
pIter
)
{
SSchTask
*
task
=
(
SSchTask
*
)
pIter
;
param
->
taskId
=
task
->
taskId
;
SQueryTableRsp
rsp
=
{
0
};
dataBuf
.
pData
=
&
rsp
;
dataBuf
.
len
=
sizeof
(
rsp
);
code
=
schHandleCallback
(
param
,
&
dataBuf
,
TDMT_VND_QUERY_RSP
,
0
);
assert
(
code
==
0
||
code
);
pIter
=
taosHashIterate
(
execTasks
,
pIter
);
}
param
=
(
SSchCallbackParam
*
)
calloc
(
1
,
sizeof
(
*
param
));
param
->
queryId
=
schtQueryId
;
pIter
=
taosHashIterate
(
execTasks
,
NULL
);
while
(
pIter
)
{
SSchTask
*
task
=
(
SSchTask
*
)
pIter
;
param
->
taskId
=
task
->
taskId
;
SResReadyRsp
rsp
=
{
0
};
dataBuf
.
pData
=
&
rsp
;
dataBuf
.
len
=
sizeof
(
rsp
);
code
=
schHandleCallback
(
param
,
&
dataBuf
,
TDMT_VND_RES_READY_RSP
,
0
);
assert
(
code
==
0
||
code
);
pIter
=
taosHashIterate
(
execTasks
,
pIter
);
}
param
=
(
SSchCallbackParam
*
)
calloc
(
1
,
sizeof
(
*
param
));
param
->
queryId
=
schtQueryId
;
pIter
=
taosHashIterate
(
execTasks
,
NULL
);
while
(
pIter
)
{
SSchTask
*
task
=
(
SSchTask
*
)
pIter
;
param
->
taskId
=
task
->
taskId
-
1
;
SQueryTableRsp
rsp
=
{
0
};
dataBuf
.
pData
=
&
rsp
;
dataBuf
.
len
=
sizeof
(
rsp
);
code
=
schHandleCallback
(
param
,
&
dataBuf
,
TDMT_VND_QUERY_RSP
,
0
);
assert
(
code
==
0
||
code
);
pIter
=
taosHashIterate
(
execTasks
,
pIter
);
}
param
=
(
SSchCallbackParam
*
)
calloc
(
1
,
sizeof
(
*
param
));
param
->
queryId
=
schtQueryId
;
pIter
=
taosHashIterate
(
execTasks
,
NULL
);
while
(
pIter
)
{
SSchTask
*
task
=
(
SSchTask
*
)
pIter
;
param
->
taskId
=
task
->
taskId
-
1
;
SResReadyRsp
rsp
=
{
0
};
dataBuf
.
pData
=
&
rsp
;
dataBuf
.
len
=
sizeof
(
rsp
);
code
=
schHandleCallback
(
param
,
&
dataBuf
,
TDMT_VND_RES_READY_RSP
,
0
);
assert
(
code
==
0
||
code
);
pIter
=
taosHashIterate
(
execTasks
,
pIter
);
}
atomic_store_32
(
&
schtStartFetch
,
1
);
void
*
data
=
NULL
;
code
=
scheduleFetchRows
(
pQueryJob
,
&
data
);
assert
(
code
==
0
||
code
);
if
(
0
==
code
)
{
SRetrieveTableRsp
*
pRsp
=
(
SRetrieveTableRsp
*
)
data
;
assert
(
pRsp
->
completed
==
1
);
assert
(
pRsp
->
numOfRows
==
10
);
}
data
=
NULL
;
code
=
scheduleFetchRows
(
pQueryJob
,
&
data
);
assert
(
code
==
0
||
code
);
assert
(
data
==
(
void
*
)
NULL
);
schtFreeQueryJob
(
0
);
taosHashCleanup
(
execTasks
);
schtFreeQueryDag
(
&
dag
);
if
(
++
jobFinished
%
schtTestPrintNum
==
0
)
{
printf
(
"jobFinished:%d
\n
"
,
jobFinished
);
}
++
schtQueryId
;
}
schedulerDestroy
();
}
void
*
schtFreeJobThread
(
void
*
aa
)
{
while
(
!
schtTestStop
)
{
usleep
(
rand
()
%
2000
);
schtFreeQueryJob
(
1
);
}
}
struct
SSchJob
*
pInsertJob
=
NULL
;
}
TEST
(
queryTest
,
normalCase
)
{
...
...
@@ -427,13 +651,30 @@ TEST(insertTest, normalCase) {
}
TEST
(
multiThread
,
forceFree
)
{
pthread_attr_t
thattr
;
pthread_attr_init
(
&
thattr
);
schtInitLogFile
();
pthread_t
thread1
,
thread2
,
thread3
;
pthread_create
(
&
(
thread1
),
&
thattr
,
schtRunJobThread
,
NULL
);
pthread_create
(
&
(
thread2
),
&
thattr
,
schtFreeJobThread
,
NULL
);
pthread_create
(
&
(
thread3
),
&
thattr
,
schtFetchRspThread
,
NULL
);
while
(
true
)
{
if
(
schtTestDeadLoop
)
{
sleep
(
1
);
}
else
{
sleep
(
schtTestMTRunSec
);
break
;
}
}
schtTestStop
=
true
;
sleep
(
3
);
}
int
main
(
int
argc
,
char
**
argv
)
{
srand
(
time
(
NULL
));
testing
::
InitGoogleTest
(
&
argc
,
argv
);
return
RUN_ALL_TESTS
();
}
...
...
source/util/src/terror.c
浏览文件 @
e2d8dfaf
...
...
@@ -360,6 +360,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_CANCELLING, "Task cancelling")
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_TASK_DROPPING
,
"Task dropping"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_DUPLICATTED_OPERATION
,
"Duplicatted operation"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_TASK_MSG_ERROR
,
"Task message error"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_JOB_FREED
,
"Job already freed"
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录