Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
c572b9ca
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
c572b9ca
编写于
3月 03, 2022
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feature/qnode
上级
0729144b
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
244 addition
and
198 deletion
+244
-198
include/libs/scheduler/scheduler.h
include/libs/scheduler/scheduler.h
+4
-6
source/client/inc/clientInt.h
source/client/inc/clientInt.h
+1
-1
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+6
-6
source/libs/scheduler/inc/schedulerInt.h
source/libs/scheduler/inc/schedulerInt.h
+12
-3
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+150
-133
source/libs/scheduler/test/schedulerTests.cpp
source/libs/scheduler/test/schedulerTests.cpp
+71
-49
未找到文件。
include/libs/scheduler/scheduler.h
浏览文件 @
c572b9ca
...
...
@@ -23,8 +23,6 @@ extern "C" {
#include "catalog.h"
#include "planner.h"
struct
SSchJob
;
typedef
struct
SSchedulerCfg
{
uint32_t
maxJobNum
;
}
SSchedulerCfg
;
...
...
@@ -72,7 +70,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg);
* @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr
* @return
*/
int32_t
schedulerExecJob
(
void
*
transport
,
SArray
*
nodeList
,
SQueryDag
*
pDag
,
struct
SSchJob
**
pJob
,
const
char
*
sql
,
SQueryResult
*
pRes
);
int32_t
schedulerExecJob
(
void
*
transport
,
SArray
*
nodeList
,
SQueryDag
*
pDag
,
int64_t
*
pJob
,
const
char
*
sql
,
SQueryResult
*
pRes
);
/**
* Process the query job, generated according to the query physical plan.
...
...
@@ -80,7 +78,7 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, str
* @param pNodeList Qnode/Vnode address list, element is SQueryNodeAddr
* @return
*/
int32_t
schedulerAsyncExecJob
(
void
*
transport
,
SArray
*
pNodeList
,
SQueryDag
*
pDag
,
const
char
*
sql
,
struct
SSchJob
**
pJob
);
int32_t
schedulerAsyncExecJob
(
void
*
transport
,
SArray
*
pNodeList
,
SQueryDag
*
pDag
,
const
char
*
sql
,
int64_t
*
pJob
);
/**
* Fetch query result from the remote query executor
...
...
@@ -88,7 +86,7 @@ int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryDag* pDa
* @param data
* @return
*/
int32_t
schedulerFetchRows
(
struct
SSchJob
*
pJ
ob
,
void
**
data
);
int32_t
schedulerFetchRows
(
int64_t
j
ob
,
void
**
data
);
/**
...
...
@@ -102,7 +100,7 @@ int32_t schedulerFetchRows(struct SSchJob *pJob, void **data);
* Free the query job
* @param pJob
*/
void
schedulerFreeJob
(
void
*
pJ
ob
);
void
schedulerFreeJob
(
int64_t
j
ob
);
void
schedulerDestroy
(
void
);
...
...
source/client/inc/clientInt.h
浏览文件 @
c572b9ca
...
...
@@ -171,7 +171,7 @@ typedef struct SRequestSendRecvBody {
void
*
fp
;
SShowReqInfo
showInfo
;
// todo this attribute will be removed after the query framework being completed.
SDataBuf
requestMsg
;
struct
SSchJob
*
pQ
ueryJob
;
// query job, created according to sql query DAG.
int64_t
q
ueryJob
;
// query job, created according to sql query DAG.
struct
SQueryDag
*
pDag
;
// the query dag, generated according to the sql statement.
SReqResultInfo
resInfo
;
}
SRequestSendRecvBody
;
...
...
source/client/src/clientImpl.c
浏览文件 @
c572b9ca
...
...
@@ -227,10 +227,10 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t
int32_t
scheduleQuery
(
SRequestObj
*
pRequest
,
SQueryDag
*
pDag
,
SArray
*
pNodeList
)
{
void
*
pTransporter
=
pRequest
->
pTscObj
->
pAppInfo
->
pTransporter
;
SQueryResult
res
=
{.
code
=
0
,
.
numOfRows
=
0
,
.
msgSize
=
ERROR_MSG_BUF_DEFAULT_SIZE
,
.
msg
=
pRequest
->
msgBuf
};
int32_t
code
=
schedulerExecJob
(
pTransporter
,
pNodeList
,
pDag
,
&
pRequest
->
body
.
pQ
ueryJob
,
pRequest
->
sqlstr
,
&
res
);
int32_t
code
=
schedulerExecJob
(
pTransporter
,
pNodeList
,
pDag
,
&
pRequest
->
body
.
q
ueryJob
,
pRequest
->
sqlstr
,
&
res
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
pRequest
->
body
.
pQueryJob
!=
NULL
)
{
schedulerFreeJob
(
pRequest
->
body
.
pQ
ueryJob
);
if
(
pRequest
->
body
.
queryJob
!=
0
)
{
schedulerFreeJob
(
pRequest
->
body
.
q
ueryJob
);
}
pRequest
->
code
=
code
;
...
...
@@ -240,8 +240,8 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList)
if
(
TSDB_SQL_INSERT
==
pRequest
->
type
||
TSDB_SQL_CREATE_TABLE
==
pRequest
->
type
)
{
pRequest
->
body
.
resInfo
.
numOfRows
=
res
.
numOfRows
;
if
(
pRequest
->
body
.
pQueryJob
!=
NULL
)
{
schedulerFreeJob
(
pRequest
->
body
.
pQ
ueryJob
);
if
(
pRequest
->
body
.
queryJob
!=
0
)
{
schedulerFreeJob
(
pRequest
->
body
.
q
ueryJob
);
}
}
...
...
@@ -494,7 +494,7 @@ void* doFetchRow(SRequestObj* pRequest) {
}
SReqResultInfo
*
pResInfo
=
&
pRequest
->
body
.
resInfo
;
int32_t
code
=
schedulerFetchRows
(
pRequest
->
body
.
pQ
ueryJob
,
(
void
**
)
&
pResInfo
->
pData
);
int32_t
code
=
schedulerFetchRows
(
pRequest
->
body
.
q
ueryJob
,
(
void
**
)
&
pResInfo
->
pData
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
pRequest
->
code
=
code
;
return
NULL
;
...
...
source/libs/scheduler/inc/schedulerInt.h
浏览文件 @
c572b9ca
...
...
@@ -36,6 +36,11 @@ enum {
SCH_WRITE
,
};
typedef
struct
SSchTrans
{
void
*
transInst
;
void
*
transHandle
;
}
SSchTrans
;
typedef
struct
SSchApiStat
{
}
SSchApiStat
;
...
...
@@ -59,12 +64,13 @@ typedef struct SSchedulerMgmt {
uint64_t
taskId
;
// sequential taksId
uint64_t
sId
;
// schedulerId
SSchedulerCfg
cfg
;
SHashObj
*
jobs
;
// key: queryId, value: SQueryJob*
int32_t
jobRef
;
SSchedulerStat
stat
;
}
SSchedulerMgmt
;
typedef
struct
SSchCallbackParam
{
uint64_t
queryId
;
int64_t
refId
;
uint64_t
taskId
;
}
SSchCallbackParam
;
...
...
@@ -75,7 +81,8 @@ typedef struct SSchLevel {
int32_t
taskFailed
;
int32_t
taskSucceed
;
int32_t
taskNum
;
SArray
*
subTasks
;
// Element is SQueryTask
int32_t
taskLaunchIdx
;
// launch startup index
SArray
*
subTasks
;
// Element is SQueryTask
}
SSchLevel
;
typedef
struct
SSchTask
{
...
...
@@ -105,6 +112,7 @@ typedef struct SSchJobAttr {
}
SSchJobAttr
;
typedef
struct
SSchJob
{
int64_t
refId
;
uint64_t
queryId
;
SSchJobAttr
attr
;
int32_t
levelNum
;
...
...
@@ -119,7 +127,6 @@ typedef struct SSchJob {
SHashObj
*
succTasks
;
// succeed tasks, key:taskid, value:SQueryTask*
SHashObj
*
failTasks
;
// failed tasks, key:taskid, value:SQueryTask*
int32_t
ref
;
int8_t
status
;
SQueryNodeAddr
resNode
;
tsem_t
rspSem
;
...
...
@@ -168,6 +175,8 @@ typedef struct SSchJob {
static
int32_t
schLaunchTask
(
SSchJob
*
job
,
SSchTask
*
task
);
static
int32_t
schBuildAndSendMsg
(
SSchJob
*
job
,
SSchTask
*
task
,
SQueryNodeAddr
*
addr
,
int32_t
msgType
);
SSchJob
*
schAcquireJob
(
int64_t
refId
);
int32_t
schReleaseJob
(
int64_t
refId
);
#ifdef __cplusplus
}
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
c572b9ca
...
...
@@ -17,12 +17,17 @@
#include "tmsg.h"
#include "query.h"
#include "catalog.h"
#include "tref.h"
typedef
struct
SSchTrans
{
void
*
transInst
;
void
*
transHandle
;
}
SSchTrans
;
static
SSchedulerMgmt
schMgmt
=
{
0
};
SSchedulerMgmt
schMgmt
=
{
0
};
FORCE_INLINE
SSchJob
*
schAcquireJob
(
int64_t
refId
)
{
return
(
SSchJob
*
)
taosAcquireRef
(
schMgmt
.
jobRef
,
refId
);
}
FORCE_INLINE
int32_t
schReleaseJob
(
int64_t
refId
)
{
return
taosReleaseRef
(
schMgmt
.
jobRef
,
refId
);
}
uint64_t
schGenTaskId
(
void
)
{
return
atomic_add_fetch_64
(
&
schMgmt
.
taskId
,
1
);
...
...
@@ -886,7 +891,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
}
case
TDMT_VND_DROP_TASK_RSP
:
{
// SHOULD NEVER REACH HERE
SCH_TASK_ELOG
(
"invalid status to handle drop task rsp, ref
:%d"
,
atomic_load_32
(
&
pJob
->
ref
)
);
SCH_TASK_ELOG
(
"invalid status to handle drop task rsp, ref
Id:%"
PRIx64
,
pJob
->
refId
);
SCH_ERR_JRET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
break
;
}
...
...
@@ -908,28 +913,23 @@ _return:
int32_t
schHandleCallback
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
msgType
,
int32_t
rspCode
)
{
int32_t
code
=
0
;
SSchCallbackParam
*
pParam
=
(
SSchCallbackParam
*
)
param
;
SSchJob
*
pJob
=
NULL
;
SSchTask
*
pTask
=
NULL
;
SSchJob
*
*
job
=
taosHashGet
(
schMgmt
.
jobs
,
&
pParam
->
queryId
,
sizeof
(
pParam
->
queryId
)
);
if
(
NULL
==
job
||
NULL
==
(
*
job
)
)
{
qError
(
"QID:
%"
PRIx64
" taosHashGet queryId not exist, may be dropped"
,
pParam
->
query
Id
);
SSchJob
*
pJob
=
taosAcquireRef
(
schMgmt
.
jobRef
,
pParam
->
refId
);
if
(
NULL
==
pJob
)
{
qError
(
"QID:
0x%"
PRIx64
",TID:0x%"
PRIx64
"taosAcquireRef job failed, may be dropped, refId:%"
PRIx64
,
pParam
->
queryId
,
pParam
->
taskId
,
pParam
->
ref
Id
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_JOB_FREED
);
}
pJob
=
*
job
;
atomic_add_fetch_32
(
&
pJob
->
ref
,
1
);
int32_t
s
=
taosHashGetSize
(
pJob
->
execTasks
);
if
(
s
<=
0
)
{
qError
(
"QID:%"
PRIx64
",TID:%"
PRId64
" no task in execTask list"
,
pParam
->
query
Id
,
pParam
->
taskId
);
SCH_JOB_ELOG
(
"empty execTask list, refId:%"
PRIx64
", taskId:%"
PRIx64
,
pParam
->
ref
Id
,
pParam
->
taskId
);
SCH_ERR_JRET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
SSchTask
**
task
=
taosHashGet
(
pJob
->
execTasks
,
&
pParam
->
taskId
,
sizeof
(
pParam
->
taskId
));
if
(
NULL
==
task
||
NULL
==
(
*
task
))
{
qError
(
"QID:%"
PRIx64
",TID:%"
PRId64
" taosHashGet taskId not exist"
,
pParam
->
query
Id
,
pParam
->
taskId
);
SCH_JOB_ELOG
(
"task not found in execTask list, refId:%"
PRIx64
", taskId:%"
PRIx64
,
pParam
->
ref
Id
,
pParam
->
taskId
);
SCH_ERR_JRET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
...
...
@@ -942,7 +942,7 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in
_return:
if
(
pJob
)
{
atomic_sub_fetch_32
(
&
pJob
->
ref
,
1
);
taosReleaseRef
(
schMgmt
.
jobRef
,
pParam
->
refId
);
}
tfree
(
param
);
...
...
@@ -1003,28 +1003,29 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
}
int32_t
schAsyncSendMsg
(
void
*
transport
,
SEpSet
*
epSet
,
uint64_t
qId
,
uint64_t
tId
,
int32_t
msgType
,
void
*
msg
,
uint32_t
msgSize
)
{
int32_t
schAsyncSendMsg
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
void
*
transport
,
SEpSet
*
epSet
,
int32_t
msgType
,
void
*
msg
,
uint32_t
msgSize
)
{
int32_t
code
=
0
;
SSchTrans
*
trans
=
(
SSchTrans
*
)
transport
;
SMsgSendInfo
*
pMsgSendInfo
=
calloc
(
1
,
sizeof
(
SMsgSendInfo
));
if
(
NULL
==
pMsgSendInfo
)
{
qError
(
"QID:%"
PRIx64
",TID:%"
PRIx64
" calloc %d failed"
,
qId
,
tId
,
(
int32_t
)
sizeof
(
SMsgSendInfo
));
SCH_TASK_ELOG
(
"calloc %d failed"
,
(
int32_t
)
sizeof
(
SMsgSendInfo
));
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
SSchCallbackParam
*
param
=
calloc
(
1
,
sizeof
(
SSchCallbackParam
));
if
(
NULL
==
param
)
{
qError
(
"QID:%"
PRIx64
",TID:%"
PRIx64
" calloc %d failed"
,
qId
,
tId
,
(
int32_t
)
sizeof
(
SSchCallbackParam
));
SCH_TASK_ELOG
(
"calloc %d failed"
,
(
int32_t
)
sizeof
(
SSchCallbackParam
));
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
__async_send_cb_fn_t
fp
=
NULL
;
SCH_ERR_JRET
(
schGetCallbackFp
(
msgType
,
&
fp
));
param
->
queryId
=
qId
;
param
->
taskId
=
tId
;
param
->
queryId
=
pJob
->
queryId
;
param
->
refId
=
pJob
->
refId
;
param
->
taskId
=
pTask
->
taskId
;
pMsgSendInfo
->
param
=
param
;
...
...
@@ -1040,7 +1041,7 @@ int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t t
SCH_ERR_JRET
(
code
);
}
qDebug
(
"QID:0x%"
PRIx64
",TID:0x%"
PRIx64
" req msg sent, type:%d, %s"
,
qId
,
t
Id
,
msgType
,
TMSG_INFO
(
msgType
));
SCH_TASK_DLOG
(
"req msg sent, refId:%"
PRIx64
", type:%d, %s"
,
pJob
->
ref
Id
,
msgType
,
TMSG_INFO
(
msgType
));
return
TSDB_CODE_SUCCESS
;
_return:
...
...
@@ -1160,7 +1161,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
atomic_store_32
(
&
pTask
->
lastMsgType
,
msgType
);
SSchTrans
trans
=
{.
transInst
=
pJob
->
transport
,
.
transHandle
=
pTask
->
handle
};
SCH_ERR_JRET
(
schAsyncSendMsg
(
&
trans
,
&
epSet
,
pJob
->
queryId
,
pTask
->
taskId
,
msgType
,
msg
,
msgSize
));
SCH_ERR_JRET
(
schAsyncSendMsg
(
pJob
,
pTask
,
&
trans
,
&
epSet
,
msgType
,
msg
,
msgSize
));
if
(
isCandidateAddr
)
{
SCH_ERR_RET
(
schRecordTaskExecNode
(
pJob
,
pTask
,
addr
));
...
...
@@ -1283,7 +1284,60 @@ void schDropJobAllTasks(SSchJob *pJob) {
schDropTaskInHashList
(
pJob
,
pJob
->
failTasks
);
}
static
int32_t
schExecJobImpl
(
void
*
transport
,
SArray
*
pNodeList
,
SQueryDag
*
pDag
,
struct
SSchJob
**
job
,
const
char
*
sql
,
bool
syncSchedule
)
{
int32_t
schCancelJob
(
SSchJob
*
pJob
)
{
//TODO
//TODO MOVE ALL TASKS FROM EXEC LIST TO FAIL LIST
}
void
schFreeJobImpl
(
void
*
job
)
{
if
(
NULL
==
job
)
{
return
;
}
SSchJob
*
pJob
=
job
;
uint64_t
queryId
=
pJob
->
queryId
;
int64_t
refId
=
pJob
->
refId
;
if
(
pJob
->
status
==
JOB_TASK_STATUS_EXECUTING
)
{
schCancelJob
(
pJob
);
}
schDropJobAllTasks
(
pJob
);
pJob
->
subPlans
=
NULL
;
// it is a reference to pDag->pSubplans
int32_t
numOfLevels
=
taosArrayGetSize
(
pJob
->
levels
);
for
(
int32_t
i
=
0
;
i
<
numOfLevels
;
++
i
)
{
SSchLevel
*
pLevel
=
taosArrayGet
(
pJob
->
levels
,
i
);
int32_t
numOfTasks
=
taosArrayGetSize
(
pLevel
->
subTasks
);
for
(
int32_t
j
=
0
;
j
<
numOfTasks
;
++
j
)
{
SSchTask
*
pTask
=
taosArrayGet
(
pLevel
->
subTasks
,
j
);
schFreeTask
(
pTask
);
}
taosArrayDestroy
(
pLevel
->
subTasks
);
}
taosHashCleanup
(
pJob
->
execTasks
);
taosHashCleanup
(
pJob
->
failTasks
);
taosHashCleanup
(
pJob
->
succTasks
);
taosArrayDestroy
(
pJob
->
levels
);
taosArrayDestroy
(
pJob
->
nodeList
);
tfree
(
pJob
->
res
);
tfree
(
pJob
);
qDebug
(
"QID:0x%"
PRIx64
" job freed, refId:%"
PRIx64
", pointer:%p"
,
queryId
,
refId
,
pJob
);
}
static
int32_t
schExecJobImpl
(
void
*
transport
,
SArray
*
pNodeList
,
SQueryDag
*
pDag
,
int64_t
*
job
,
const
char
*
sql
,
bool
syncSchedule
)
{
qDebug
(
"QID:0x%"
PRIx64
" job started"
,
pDag
->
queryId
);
if
(
pNodeList
==
NULL
||
(
pNodeList
&&
taosArrayGetSize
(
pNodeList
)
<=
0
))
{
...
...
@@ -1327,21 +1381,20 @@ static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryDag* pDa
tsem_init
(
&
pJob
->
rspSem
,
0
,
0
);
code
=
taosHashPut
(
schMgmt
.
jobs
,
&
pJob
->
queryId
,
sizeof
(
pJob
->
queryId
),
&
pJob
,
POINTER_BYTES
);
if
(
0
!=
code
)
{
if
(
HASH_NODE_EXIST
(
code
))
{
SCH_JOB_ELOG
(
"job already exist, isQueryJob:%d"
,
pJob
->
attr
.
queryJob
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
else
{
SCH_JOB_ELOG
(
"taosHashPut job failed, errno:%d"
,
errno
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
pJob
->
refId
=
taosAddRef
(
schMgmt
.
jobRef
,
pJob
);
if
(
pJob
->
refId
<
0
)
{
SCH_JOB_ELOG
(
"taosHashPut job failed, error:%s"
,
tstrerror
(
terrno
));
SCH_ERR_JRET
(
terrno
);
}
SCH_JOB_DLOG
(
"job refId:%"
PRIx64
,
pJob
->
refId
);
pJob
->
status
=
JOB_TASK_STATUS_NOT_START
;
SCH_ERR_JRET
(
schLaunchJob
(
pJob
));
*
(
SSchJob
**
)
job
=
pJob
;
taosAcquireRef
(
schMgmt
.
jobRef
,
pJob
->
refId
);
*
job
=
pJob
->
refId
;
if
(
syncSchedule
)
{
SCH_JOB_DLOG
(
"will wait for rsp now, job status:%d"
,
SCH_GET_JOB_STATUS
(
pJob
));
...
...
@@ -1349,25 +1402,20 @@ static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryDag* pDa
}
SCH_JOB_DLOG
(
"job exec done, job status:%d"
,
SCH_GET_JOB_STATUS
(
pJob
));
taosReleaseRef
(
schMgmt
.
jobRef
,
pJob
->
refId
);
return
TSDB_CODE_SUCCESS
;
_return:
*
(
SSchJob
**
)
job
=
NULL
;
schedulerFreeJob
(
pJob
);
schFreeJobImpl
(
pJob
);
SCH_RET
(
code
);
}
int32_t
schCancelJob
(
SSchJob
*
pJob
)
{
//TODO
//TODO MOVE ALL TASKS FROM EXEC LIST TO FAIL LIST
}
int32_t
schedulerInit
(
SSchedulerCfg
*
cfg
)
{
if
(
schMgmt
.
job
s
)
{
if
(
schMgmt
.
job
Ref
)
{
qError
(
"scheduler already initialized"
);
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
...
...
@@ -1381,9 +1429,9 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
}
else
{
schMgmt
.
cfg
.
maxJobNum
=
SCHEDULE_DEFAULT_JOB_NUMBER
;
}
schMgmt
.
job
s
=
taosHashInit
(
schMgmt
.
cfg
.
maxJobNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_UBIGINT
),
false
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
schMgmt
.
jobs
)
{
schMgmt
.
job
Ref
=
taosOpenRef
(
schMgmt
.
cfg
.
maxJobNum
,
schFreeJobImpl
);
if
(
schMgmt
.
jobRef
<
0
)
{
qError
(
"init schduler jobs failed, num:%u"
,
schMgmt
.
cfg
.
maxJobNum
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
...
...
@@ -1398,24 +1446,28 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
schedulerExecJob
(
void
*
transport
,
SArray
*
nodeList
,
SQueryDag
*
pDag
,
struct
SSchJob
**
pJob
,
const
char
*
sql
,
SQueryResult
*
pRes
)
{
int32_t
schedulerExecJob
(
void
*
transport
,
SArray
*
nodeList
,
SQueryDag
*
pDag
,
int64_t
*
pJob
,
const
char
*
sql
,
SQueryResult
*
pRes
)
{
if
(
NULL
==
transport
||
NULL
==
pDag
||
NULL
==
pDag
->
pSubplans
||
NULL
==
pJob
||
NULL
==
pRes
)
{
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
SCH_ERR_RET
(
schExecJobImpl
(
transport
,
nodeList
,
pDag
,
pJob
,
sql
,
true
));
pRes
->
code
=
atomic_load_32
(
&
(
*
pJob
)
->
errCode
);
pRes
->
numOfRows
=
(
*
pJob
)
->
resNumOfRows
;
SSchJob
*
job
=
taosAcquireRef
(
schMgmt
.
jobRef
,
*
pJob
);
pRes
->
code
=
atomic_load_32
(
&
job
->
errCode
);
pRes
->
numOfRows
=
job
->
resNumOfRows
;
taosReleaseRef
(
schMgmt
.
jobRef
,
*
pJob
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
schedulerAsyncExecJob
(
void
*
transport
,
SArray
*
pNodeList
,
SQueryDag
*
pDag
,
const
char
*
sql
,
struct
SSchJob
**
pJob
)
{
int32_t
schedulerAsyncExecJob
(
void
*
transport
,
SArray
*
pNodeList
,
SQueryDag
*
pDag
,
const
char
*
sql
,
int64_t
*
pJob
)
{
if
(
NULL
==
transport
||
NULL
==
pDag
||
NULL
==
pDag
->
pSubplans
||
NULL
==
pJob
)
{
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
SCH_ERR_RET
(
schExecJobImpl
(
transport
,
pNodeList
,
pDag
,
pJob
,
sql
,
false
));
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1541,28 +1593,35 @@ _return:
}
int32_t
schedulerFetchRows
(
SSchJob
*
pJ
ob
,
void
**
pData
)
{
if
(
NULL
==
p
Job
||
NULL
==
p
Data
)
{
int32_t
schedulerFetchRows
(
int64_t
j
ob
,
void
**
pData
)
{
if
(
NULL
==
pData
)
{
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
int32_t
code
=
0
;
atomic_add_fetch_32
(
&
pJob
->
ref
,
1
);
SSchJob
*
pJob
=
taosAcquireRef
(
schMgmt
.
jobRef
,
job
);
if
(
NULL
==
pJob
)
{
qError
(
"acquire job from jobRef list failed, may be dropped, refId:%"
PRIx64
,
job
);
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
int8_t
status
=
SCH_GET_JOB_STATUS
(
pJob
);
if
(
status
==
JOB_TASK_STATUS_DROPPING
)
{
SCH_JOB_ELOG
(
"job is dropping, status:%d"
,
status
);
SCH_ERR_JRET
(
TSDB_CODE_SCH_STATUS_ERROR
);
taosReleaseRef
(
schMgmt
.
jobRef
,
job
);
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
if
(
!
SCH_JOB_NEED_FETCH
(
&
pJob
->
attr
))
{
SCH_JOB_ELOG
(
"no need to fetch data, status:%d"
,
SCH_GET_JOB_STATUS
(
pJob
));
SCH_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
taosReleaseRef
(
schMgmt
.
jobRef
,
job
);
SCH_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
}
if
(
atomic_val_compare_exchange_8
(
&
pJob
->
userFetch
,
0
,
1
)
!=
0
)
{
SCH_JOB_ELOG
(
"prior fetching not finished, userFetch:%d"
,
atomic_load_8
(
&
pJob
->
userFetch
));
SCH_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
taosReleaseRef
(
schMgmt
.
jobRef
,
job
);
SCH_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
}
if
(
JOB_TASK_STATUS_FAILED
==
status
||
JOB_TASK_STATUS_DROPPING
==
status
)
{
...
...
@@ -1588,7 +1647,6 @@ int32_t schedulerFetchRows(SSchJob *pJob, void** pData) {
SCH_ERR_JRET
(
schCheckAndUpdateJobStatus
(
pJob
,
JOB_TASK_STATUS_SUCCEED
));
}
_return:
while
(
true
)
{
*
pData
=
atomic_load_ptr
(
&
pJob
->
res
);
...
...
@@ -1609,96 +1667,47 @@ _return:
SCH_JOB_DLOG
(
"empty res and set query complete, code:%x"
,
code
);
}
atomic_val_compare_exchange_8
(
&
pJob
->
userFetch
,
1
,
0
);
SCH_JOB_DLOG
(
"fetch done, totalRows:%d, code:%s"
,
pJob
->
resNumOfRows
,
tstrerror
(
code
));
atomic_sub_fetch_32
(
&
pJob
->
ref
,
1
);
_return:
atomic_val_compare_exchange_8
(
&
pJob
->
userFetch
,
1
,
0
);
taosReleaseRef
(
schMgmt
.
jobRef
,
job
);
SCH_RET
(
code
);
}
int32_t
scheduleCancelJob
(
void
*
job
)
{
SSchJob
*
pJob
=
(
SSchJob
*
)
job
;
atomic_add_fetch_32
(
&
pJob
->
ref
,
1
);
int32_t
scheduleCancelJob
(
int64_t
job
)
{
SSchJob
*
pJob
=
taosAcquireRef
(
schMgmt
.
jobRef
,
job
);
if
(
NULL
==
pJob
)
{
qError
(
"acquire job from jobRef list failed, may be dropped, refId:%"
PRIx64
,
job
);
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
int32_t
code
=
schCancelJob
(
pJob
);
atomic_sub_fetch_32
(
&
pJob
->
ref
,
1
);
taosReleaseRef
(
schMgmt
.
jobRef
,
job
);
SCH_RET
(
code
);
}
void
schedulerFreeJob
(
void
*
job
)
{
if
(
NULL
==
job
)
{
void
schedulerFreeJob
(
int64_t
job
)
{
SSchJob
*
pJob
=
taosAcquireRef
(
schMgmt
.
jobRef
,
job
);
if
(
NULL
==
pJob
)
{
qError
(
"acquire job from jobRef list failed, may be dropped, refId:%"
PRIx64
,
job
);
return
;
}
SSchJob
*
pJob
=
job
;
uint64_t
queryId
=
pJob
->
queryId
;
bool
setJobFree
=
false
;
if
(
SCH_GET_JOB_STATUS
(
pJob
)
>
0
)
{
if
(
0
!=
taosHashRemove
(
schMgmt
.
jobs
,
&
pJob
->
queryId
,
sizeof
(
pJob
->
queryId
)))
{
SCH_JOB_ELOG
(
"taosHashRemove job from list failed, may already freed, pJob:%p"
,
pJob
);
return
;
}
SCH_JOB_DLOG
(
"job removed from list, no further ref, ref:%d"
,
atomic_load_32
(
&
pJob
->
ref
));
while
(
true
)
{
int32_t
ref
=
atomic_load_32
(
&
pJob
->
ref
);
if
(
0
==
ref
)
{
break
;
}
else
if
(
ref
>
0
)
{
if
(
1
==
ref
&&
atomic_load_8
(
&
pJob
->
userFetch
)
>
0
&&
!
setJobFree
)
{
schProcessOnJobDropped
(
pJob
,
TSDB_CODE_QRY_JOB_FREED
);
setJobFree
=
true
;
}
usleep
(
1
);
}
else
{
SCH_JOB_ELOG
(
"invalid job ref number, ref:%d"
,
ref
);
break
;
}
}
SCH_JOB_DLOG
(
"job no ref now, status:%d"
,
SCH_GET_JOB_STATUS
(
pJob
));
if
(
pJob
->
status
==
JOB_TASK_STATUS_EXECUTING
)
{
schCancelJob
(
pJob
);
}
schDropJobAllTasks
(
pJob
);
if
(
atomic_load_8
(
&
pJob
->
userFetch
)
>
0
)
{
schProcessOnJobDropped
(
pJob
,
TSDB_CODE_QRY_JOB_FREED
);
}
pJob
->
subPlans
=
NULL
;
// it is a reference to pDag->pSubplans
int32_t
numOfLevels
=
taosArrayGetSize
(
pJob
->
levels
);
for
(
int32_t
i
=
0
;
i
<
numOfLevels
;
++
i
)
{
SSchLevel
*
pLevel
=
taosArrayGet
(
pJob
->
levels
,
i
);
int32_t
numOfTasks
=
taosArrayGetSize
(
pLevel
->
subTasks
);
for
(
int32_t
j
=
0
;
j
<
numOfTasks
;
++
j
)
{
SSchTask
*
pTask
=
taosArrayGet
(
pLevel
->
subTasks
,
j
);
schFreeTask
(
pTask
);
}
SCH_JOB_DLOG
(
"start to remove job from jobRef list, refId:%"
PRIx64
,
job
);
taosArrayDestroy
(
pLevel
->
subTasks
);
if
(
taosRemoveRef
(
schMgmt
.
jobRef
,
job
))
{
SCH_JOB_ELOG
(
"remove job from job list failed, refId:%"
PRIx64
,
job
);
}
taosHashCleanup
(
pJob
->
execTasks
);
taosHashCleanup
(
pJob
->
failTasks
);
taosHashCleanup
(
pJob
->
succTasks
);
taosArrayDestroy
(
pJob
->
levels
);
taosArrayDestroy
(
pJob
->
nodeList
);
tfree
(
pJob
->
res
);
tfree
(
pJob
);
qDebug
(
"QID:0x%"
PRIx64
" job freed"
,
queryId
);
}
void
schedulerFreeTaskList
(
SArray
*
taskList
)
{
...
...
@@ -1716,9 +1725,17 @@ void schedulerFreeTaskList(SArray *taskList) {
}
void
schedulerDestroy
(
void
)
{
if
(
schMgmt
.
jobs
)
{
taosHashCleanup
(
schMgmt
.
jobs
);
//TODO
schMgmt
.
jobs
=
NULL
;
if
(
schMgmt
.
jobRef
)
{
SSchJob
*
pJob
=
taosIterateRef
(
schMgmt
.
jobRef
,
0
);
while
(
pJob
)
{
taosRemoveRef
(
schMgmt
.
jobRef
,
pJob
->
refId
);
pJob
=
taosIterateRef
(
schMgmt
.
jobRef
,
pJob
->
refId
);
}
taosCloseRef
(
schMgmt
.
jobRef
);
schMgmt
.
jobRef
=
0
;
}
}
source/libs/scheduler/test/schedulerTests.cpp
浏览文件 @
c572b9ca
...
...
@@ -38,15 +38,15 @@
#include "schedulerInt.h"
#include "stub.h"
#include "addr_any.h"
#include "tref.h"
namespace
{
extern
"C"
int32_t
schHandleResponseMsg
(
SSchJob
*
job
,
SSchTask
*
task
,
int32_t
msgType
,
char
*
msg
,
int32_t
msgSize
,
int32_t
rspCode
);
extern
"C"
int32_t
schHandleCallback
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
msgType
,
int32_t
rspCode
);
struct
SSchJob
*
pInsertJob
=
NULL
;
struct
SSchJob
*
pQueryJob
=
NULL
;
int64_t
insertJobRefId
=
0
;
int64_t
queryJobRefId
=
0
;
uint64_t
schtMergeTemplateId
=
0x4
;
uint64_t
schtFetchTaskId
=
0
;
...
...
@@ -65,6 +65,7 @@ void schtInitLogFile() {
tsAsyncLog
=
0
;
qDebugFlag
=
159
;
strcpy
(
tsLogDir
,
"/var/log/taos"
);
if
(
taosInitLog
(
defaultLogFileNamePrefix
,
maxLogFileNum
)
<
0
)
{
printf
(
"failed to open log file in directory:%s
\n
"
,
tsLogDir
);
...
...
@@ -255,34 +256,40 @@ void schtSetAsyncSendMsgToServer() {
void
*
schtSendRsp
(
void
*
param
)
{
SSchJob
*
job
=
NULL
;
SSchJob
*
pJob
=
NULL
;
int64_t
job
=
0
;
int32_t
code
=
0
;
while
(
true
)
{
job
=
*
(
SSchJob
*
*
)
param
;
job
=
*
(
int64_t
*
)
param
;
if
(
job
)
{
break
;
}
usleep
(
1000
);
}
pJob
=
schAcquireJob
(
job
);
void
*
pIter
=
taosHashIterate
(
j
ob
->
execTasks
,
NULL
);
void
*
pIter
=
taosHashIterate
(
pJ
ob
->
execTasks
,
NULL
);
while
(
pIter
)
{
SSchTask
*
task
=
*
(
SSchTask
**
)
pIter
;
SSubmitRsp
rsp
=
{
0
};
rsp
.
affectedRows
=
10
;
schHandleResponseMsg
(
j
ob
,
task
,
TDMT_VND_SUBMIT_RSP
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
schHandleResponseMsg
(
pJ
ob
,
task
,
TDMT_VND_SUBMIT_RSP
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
pIter
=
taosHashIterate
(
j
ob
->
execTasks
,
pIter
);
pIter
=
taosHashIterate
(
pJ
ob
->
execTasks
,
pIter
);
}
schReleaseJob
(
job
);
return
NULL
;
}
void
*
schtCreateFetchRspThread
(
void
*
param
)
{
struct
SSchJob
*
job
=
(
struct
SSchJob
*
)
param
;
int64_t
job
=
*
(
int64_t
*
)
param
;
SSchJob
*
pJob
=
schAcquireJob
(
job
);
sleep
(
1
);
...
...
@@ -291,8 +298,10 @@ void *schtCreateFetchRspThread(void *param) {
rsp
->
completed
=
1
;
rsp
->
numOfRows
=
10
;
code
=
schHandleResponseMsg
(
job
,
job
->
fetchTask
,
TDMT_VND_FETCH_RSP
,
(
char
*
)
rsp
,
sizeof
(
*
rsp
),
0
);
code
=
schHandleResponseMsg
(
pJob
,
pJob
->
fetchTask
,
TDMT_VND_FETCH_RSP
,
(
char
*
)
rsp
,
sizeof
(
*
rsp
),
0
);
schReleaseJob
(
job
);
assert
(
code
==
0
);
}
...
...
@@ -329,9 +338,9 @@ void *schtFetchRspThread(void *aa) {
void
schtFreeQueryJob
(
int32_t
freeThread
)
{
static
uint32_t
freeNum
=
0
;
SSchJob
*
job
=
atomic_load_ptr
(
&
pQueryJob
)
;
int64_t
job
=
queryJobRefId
;
if
(
job
&&
atomic_val_compare_exchange_
ptr
(
&
pQueryJob
,
job
,
NULL
))
{
if
(
job
&&
atomic_val_compare_exchange_
64
(
&
queryJobRefId
,
job
,
0
))
{
schedulerFreeJob
(
job
);
if
(
freeThread
)
{
if
(
++
freeNum
%
schtTestPrintNum
==
0
)
{
...
...
@@ -360,7 +369,7 @@ void* schtRunJobThread(void *aa) {
schtSetExecNode
();
schtSetAsyncSendMsgToServer
();
SSchJob
*
j
ob
=
NULL
;
SSchJob
*
pJ
ob
=
NULL
;
SSchCallbackParam
*
param
=
NULL
;
SHashObj
*
execTasks
=
NULL
;
SDataBuf
dataBuf
=
{
0
};
...
...
@@ -376,24 +385,29 @@ void* schtRunJobThread(void *aa) {
qnodeAddr
.
port
=
6031
;
taosArrayPush
(
qnodeList
,
&
qnodeAddr
);
code
=
schedulerAsyncExecJob
(
mockPointer
,
qnodeList
,
&
dag
,
"select * from tb"
,
&
job
);
code
=
schedulerAsyncExecJob
(
mockPointer
,
qnodeList
,
&
dag
,
"select * from tb"
,
&
queryJobRefId
);
assert
(
code
==
0
);
pJob
=
schAcquireJob
(
queryJobRefId
);
if
(
NULL
==
pJob
)
{
taosArrayDestroy
(
qnodeList
);
schtFreeQueryDag
(
&
dag
);
continue
;
}
execTasks
=
taosHashInit
(
5
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_UBIGINT
),
false
,
HASH_ENTRY_LOCK
);
void
*
pIter
=
taosHashIterate
(
j
ob
->
execTasks
,
NULL
);
void
*
pIter
=
taosHashIterate
(
pJ
ob
->
execTasks
,
NULL
);
while
(
pIter
)
{
SSchTask
*
task
=
*
(
SSchTask
**
)
pIter
;
schtFetchTaskId
=
task
->
taskId
-
1
;
taosHashPut
(
execTasks
,
&
task
->
taskId
,
sizeof
(
task
->
taskId
),
task
,
sizeof
(
*
task
));
pIter
=
taosHashIterate
(
j
ob
->
execTasks
,
pIter
);
pIter
=
taosHashIterate
(
pJ
ob
->
execTasks
,
pIter
);
}
param
=
(
SSchCallbackParam
*
)
calloc
(
1
,
sizeof
(
*
param
));
param
->
queryId
=
schtQueryId
;
pQueryJob
=
job
;
param
->
refId
=
queryJobRefId
;
param
->
queryId
=
pJob
->
queryId
;
pIter
=
taosHashIterate
(
execTasks
,
NULL
);
while
(
pIter
)
{
...
...
@@ -412,8 +426,9 @@ void* schtRunJobThread(void *aa) {
param
=
(
SSchCallbackParam
*
)
calloc
(
1
,
sizeof
(
*
param
));
param
->
queryId
=
schtQueryId
;
param
->
refId
=
queryJobRefId
;
param
->
queryId
=
pJob
->
queryId
;
pIter
=
taosHashIterate
(
execTasks
,
NULL
);
while
(
pIter
)
{
SSchTask
*
task
=
(
SSchTask
*
)
pIter
;
...
...
@@ -431,7 +446,8 @@ void* schtRunJobThread(void *aa) {
param
=
(
SSchCallbackParam
*
)
calloc
(
1
,
sizeof
(
*
param
));
param
->
queryId
=
schtQueryId
;
param
->
refId
=
queryJobRefId
;
param
->
queryId
=
pJob
->
queryId
;
pIter
=
taosHashIterate
(
execTasks
,
NULL
);
while
(
pIter
)
{
...
...
@@ -450,7 +466,8 @@ void* schtRunJobThread(void *aa) {
param
=
(
SSchCallbackParam
*
)
calloc
(
1
,
sizeof
(
*
param
));
param
->
queryId
=
schtQueryId
;
param
->
refId
=
queryJobRefId
;
param
->
queryId
=
pJob
->
queryId
;
pIter
=
taosHashIterate
(
execTasks
,
NULL
);
while
(
pIter
)
{
...
...
@@ -470,7 +487,7 @@ void* schtRunJobThread(void *aa) {
atomic_store_32
(
&
schtStartFetch
,
1
);
void
*
data
=
NULL
;
code
=
schedulerFetchRows
(
pQueryJob
,
&
data
);
code
=
schedulerFetchRows
(
queryJobRefId
,
&
data
);
assert
(
code
==
0
||
code
);
if
(
0
==
code
)
{
...
...
@@ -480,12 +497,13 @@ void* schtRunJobThread(void *aa) {
}
data
=
NULL
;
code
=
schedulerFetchRows
(
pQueryJob
,
&
data
);
code
=
schedulerFetchRows
(
queryJobRefId
,
&
data
);
assert
(
code
==
0
||
code
);
schtFreeQueryJob
(
0
);
taosHashCleanup
(
execTasks
);
taosArrayDestroy
(
qnodeList
);
schtFreeQueryDag
(
&
dag
);
...
...
@@ -516,7 +534,7 @@ TEST(queryTest, normalCase) {
char
*
dbname
=
"1.db1"
;
char
*
tablename
=
"table1"
;
SVgroupInfo
vgInfo
=
{
0
};
SSchJob
*
pJob
=
NULL
;
int64_t
job
=
0
;
SQueryDag
dag
=
{
0
};
schtInitLogFile
();
...
...
@@ -537,59 +555,61 @@ TEST(queryTest, normalCase) {
schtSetExecNode
();
schtSetAsyncSendMsgToServer
();
code
=
schedulerAsyncExecJob
(
mockPointer
,
qnodeList
,
&
dag
,
"select * from tb"
,
&
pJ
ob
);
code
=
schedulerAsyncExecJob
(
mockPointer
,
qnodeList
,
&
dag
,
"select * from tb"
,
&
j
ob
);
ASSERT_EQ
(
code
,
0
);
SSchJob
*
job
=
(
SSchJob
*
)
pJob
;
void
*
pIter
=
taosHashIterate
(
job
->
execTasks
,
NULL
);
SSchJob
*
pJob
=
schAcquireJob
(
job
);
void
*
pIter
=
taosHashIterate
(
pJob
->
execTasks
,
NULL
);
while
(
pIter
)
{
SSchTask
*
task
=
*
(
SSchTask
**
)
pIter
;
SQueryTableRsp
rsp
=
{
0
};
code
=
schHandleResponseMsg
(
j
ob
,
task
,
TDMT_VND_QUERY_RSP
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
code
=
schHandleResponseMsg
(
pJ
ob
,
task
,
TDMT_VND_QUERY_RSP
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
ASSERT_EQ
(
code
,
0
);
pIter
=
taosHashIterate
(
j
ob
->
execTasks
,
pIter
);
pIter
=
taosHashIterate
(
pJ
ob
->
execTasks
,
pIter
);
}
pIter
=
taosHashIterate
(
j
ob
->
execTasks
,
NULL
);
pIter
=
taosHashIterate
(
pJ
ob
->
execTasks
,
NULL
);
while
(
pIter
)
{
SSchTask
*
task
=
*
(
SSchTask
**
)
pIter
;
SResReadyRsp
rsp
=
{
0
};
code
=
schHandleResponseMsg
(
j
ob
,
task
,
TDMT_VND_RES_READY_RSP
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
code
=
schHandleResponseMsg
(
pJ
ob
,
task
,
TDMT_VND_RES_READY_RSP
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
printf
(
"code:%d"
,
code
);
ASSERT_EQ
(
code
,
0
);
pIter
=
taosHashIterate
(
j
ob
->
execTasks
,
pIter
);
pIter
=
taosHashIterate
(
pJ
ob
->
execTasks
,
pIter
);
}
pIter
=
taosHashIterate
(
j
ob
->
execTasks
,
NULL
);
pIter
=
taosHashIterate
(
pJ
ob
->
execTasks
,
NULL
);
while
(
pIter
)
{
SSchTask
*
task
=
*
(
SSchTask
**
)
pIter
;
SQueryTableRsp
rsp
=
{
0
};
code
=
schHandleResponseMsg
(
j
ob
,
task
,
TDMT_VND_QUERY_RSP
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
code
=
schHandleResponseMsg
(
pJ
ob
,
task
,
TDMT_VND_QUERY_RSP
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
ASSERT_EQ
(
code
,
0
);
pIter
=
taosHashIterate
(
j
ob
->
execTasks
,
pIter
);
pIter
=
taosHashIterate
(
pJ
ob
->
execTasks
,
pIter
);
}
pIter
=
taosHashIterate
(
j
ob
->
execTasks
,
NULL
);
pIter
=
taosHashIterate
(
pJ
ob
->
execTasks
,
NULL
);
while
(
pIter
)
{
SSchTask
*
task
=
*
(
SSchTask
**
)
pIter
;
SResReadyRsp
rsp
=
{
0
};
code
=
schHandleResponseMsg
(
j
ob
,
task
,
TDMT_VND_RES_READY_RSP
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
code
=
schHandleResponseMsg
(
pJ
ob
,
task
,
TDMT_VND_RES_READY_RSP
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
ASSERT_EQ
(
code
,
0
);
pIter
=
taosHashIterate
(
j
ob
->
execTasks
,
pIter
);
pIter
=
taosHashIterate
(
pJ
ob
->
execTasks
,
pIter
);
}
pthread_attr_t
thattr
;
pthread_attr_init
(
&
thattr
);
pthread_t
thread1
;
pthread_create
(
&
(
thread1
),
&
thattr
,
schtCreateFetchRspThread
,
job
);
pthread_create
(
&
(
thread1
),
&
thattr
,
schtCreateFetchRspThread
,
&
job
);
void
*
data
=
NULL
;
code
=
schedulerFetchRows
(
job
,
&
data
);
...
...
@@ -603,9 +623,11 @@ TEST(queryTest, normalCase) {
data
=
NULL
;
code
=
schedulerFetchRows
(
job
,
&
data
);
ASSERT_EQ
(
code
,
0
);
ASSERT_TRUE
(
data
);
ASSERT_TRUE
(
data
==
NULL
);
schReleaseJob
(
job
);
schedulerFreeJob
(
pJ
ob
);
schedulerFreeJob
(
j
ob
);
schtFreeQueryDag
(
&
dag
);
...
...
@@ -644,14 +666,14 @@ TEST(insertTest, normalCase) {
pthread_attr_init
(
&
thattr
);
pthread_t
thread1
;
pthread_create
(
&
(
thread1
),
&
thattr
,
schtSendRsp
,
&
pInsertJob
);
pthread_create
(
&
(
thread1
),
&
thattr
,
schtSendRsp
,
&
insertJobRefId
);
SQueryResult
res
=
{
0
};
code
=
schedulerExecJob
(
mockPointer
,
qnodeList
,
&
dag
,
&
pInsertJob
,
"insert into tb values(now,1)"
,
&
res
);
code
=
schedulerExecJob
(
mockPointer
,
qnodeList
,
&
dag
,
&
insertJobRefId
,
"insert into tb values(now,1)"
,
&
res
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
res
.
numOfRows
,
20
);
schedulerFreeJob
(
pInsertJob
);
schedulerFreeJob
(
insertJobRefId
);
schedulerDestroy
();
}
...
...
@@ -684,4 +706,4 @@ int main(int argc, char** argv) {
return
RUN_ALL_TESTS
();
}
#pragma GCC diagnostic pop
\ No newline at end of file
#pragma GCC diagnostic pop
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录