Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
e27f8e8a
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
e27f8e8a
编写于
5月 26, 2022
作者:
D
dapan1121
提交者:
GitHub
5月 26, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #12926 from taosdata/feature/async.scheduler
feat: scheduler async api
上级
5635dcf0
041df53f
变更
10
隐藏空白更改
内联
并排
Showing
10 changed file
with
389 addition
and
178 deletion
+389
-178
include/libs/scheduler/scheduler.h
include/libs/scheduler/scheduler.h
+8
-3
source/client/src/clientHb.c
source/client/src/clientHb.c
+1
-1
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+1
-1
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+0
-6
source/libs/parser/src/parInsert.c
source/libs/parser/src/parInsert.c
+2
-5
source/libs/scheduler/inc/schedulerInt.h
source/libs/scheduler/inc/schedulerInt.h
+31
-12
source/libs/scheduler/src/schJob.c
source/libs/scheduler/src/schJob.c
+242
-17
source/libs/scheduler/src/schRemote.c
source/libs/scheduler/src/schRemote.c
+23
-22
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+31
-101
source/libs/scheduler/test/schedulerTests.cpp
source/libs/scheduler/test/schedulerTests.cpp
+50
-10
未找到文件。
include/libs/scheduler/scheduler.h
浏览文件 @
e27f8e8a
...
...
@@ -54,8 +54,6 @@ typedef struct SQueryProfileSummary {
typedef
struct
SQueryResult
{
int32_t
code
;
uint64_t
numOfRows
;
int32_t
msgSize
;
char
*
msg
;
void
*
res
;
}
SQueryResult
;
...
...
@@ -64,6 +62,10 @@ typedef struct STaskInfo {
SSubQueryMsg
*
msg
;
}
STaskInfo
;
typedef
void
(
*
schedulerExecCallback
)(
SQueryResult
*
pResult
,
void
*
param
,
int32_t
code
);
typedef
void
(
*
schedulerFetchCallback
)(
void
*
pResult
,
void
*
param
,
int32_t
code
);
int32_t
schedulerInit
(
SSchedulerCfg
*
cfg
);
/**
...
...
@@ -80,7 +82,8 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, in
* @param pNodeList Qnode/Vnode address list, element is SQueryNodeAddr
* @return
*/
int32_t
schedulerAsyncExecJob
(
void
*
transport
,
SArray
*
pNodeList
,
SQueryPlan
*
pDag
,
const
char
*
sql
,
int64_t
*
pJob
);
int32_t
schedulerAsyncExecJob
(
void
*
pTrans
,
SArray
*
pNodeList
,
SQueryPlan
*
pDag
,
int64_t
*
pJob
,
const
char
*
sql
,
int64_t
startTs
,
schedulerExecCallback
fp
,
void
*
param
);
/**
* Fetch query result from the remote query executor
...
...
@@ -90,6 +93,8 @@ int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryPlan* pD
*/
int32_t
schedulerFetchRows
(
int64_t
job
,
void
**
data
);
int32_t
schedulerAsyncFetchRows
(
int64_t
job
,
schedulerFetchCallback
fp
,
void
*
param
);
int32_t
schedulerGetTasksStatus
(
int64_t
job
,
SArray
*
pSub
);
...
...
source/client/src/clientHb.c
浏览文件 @
e27f8e8a
...
...
@@ -140,7 +140,7 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
STscObj
*
pTscObj
=
(
STscObj
*
)
acquireTscObj
(
pRsp
->
connKey
.
tscRid
);
if
(
NULL
==
pTscObj
)
{
tscDebug
(
"tscObj rid %"
PRIx64
" not exist"
,
pRsp
->
connKey
.
tscRid
);
}
else
{
}
else
{
if
(
pRsp
->
query
->
totalDnodes
>
1
&&
!
isEpsetEqual
(
&
pTscObj
->
pAppInfo
->
mgmtEp
.
epSet
,
&
pRsp
->
query
->
epSet
))
{
updateEpSet_s
(
&
pTscObj
->
pAppInfo
->
mgmtEp
,
&
pRsp
->
query
->
epSet
);
}
...
...
source/client/src/clientImpl.c
浏览文件 @
e27f8e8a
...
...
@@ -292,7 +292,7 @@ void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) {
int32_t
scheduleQuery
(
SRequestObj
*
pRequest
,
SQueryPlan
*
pDag
,
SArray
*
pNodeList
,
void
**
pRes
)
{
void
*
pTransporter
=
pRequest
->
pTscObj
->
pAppInfo
->
pTransporter
;
SQueryResult
res
=
{.
code
=
0
,
.
numOfRows
=
0
,
.
msgSize
=
ERROR_MSG_BUF_DEFAULT_SIZE
,
.
msg
=
pRequest
->
msgBuf
};
SQueryResult
res
=
{.
code
=
0
,
.
numOfRows
=
0
};
int32_t
code
=
schedulerExecJob
(
pTransporter
,
pNodeList
,
pDag
,
&
pRequest
->
body
.
queryJob
,
pRequest
->
sqlstr
,
pRequest
->
metric
.
start
,
&
res
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
source/libs/catalog/src/catalog.c
浏览文件 @
e27f8e8a
...
...
@@ -540,12 +540,6 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) {
CTG_ERR_JRET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
SHashObj
*
metaCache
=
taosHashInit
(
gCtgMgmt
.
cfg
.
maxTblCacheNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
metaCache
)
{
qError
(
"taosHashInit failed, num:%d"
,
gCtgMgmt
.
cfg
.
maxTblCacheNum
);
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
code
=
taosHashPut
(
gCtgMgmt
.
pCluster
,
&
clusterId
,
sizeof
(
clusterId
),
&
clusterCtg
,
POINTER_BYTES
);
if
(
code
)
{
if
(
HASH_NODE_EXIST
(
code
))
{
...
...
source/libs/parser/src/parInsert.c
浏览文件 @
e27f8e8a
...
...
@@ -1078,7 +1078,6 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
int32_t
tbNum
=
0
;
char
tbFName
[
TSDB_TABLE_FNAME_LEN
];
bool
autoCreateTbl
=
false
;
STableMeta
*
pMeta
=
NULL
;
// for each table
while
(
1
)
{
...
...
@@ -1141,12 +1140,10 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
CHECK_CODE
(
getDataBlockFromList
(
pCxt
->
pTableBlockHashObj
,
tbFName
,
strlen
(
tbFName
),
TSDB_DEFAULT_PAYLOAD_SIZE
,
sizeof
(
SSubmitBlk
),
getTableInfo
(
pCxt
->
pTableMeta
).
rowSize
,
pCxt
->
pTableMeta
,
&
dataBuf
,
NULL
,
&
pCxt
->
createTblReq
));
pMeta
=
pCxt
->
pTableMeta
;
pCxt
->
pTableMeta
=
NULL
;
if
(
TK_NK_LP
==
sToken
.
type
)
{
// pSql -> field1_name, ...)
CHECK_CODE
(
parseBoundColumns
(
pCxt
,
&
dataBuf
->
boundColumnInfo
,
getTableColumnSchema
(
pMeta
)));
CHECK_CODE
(
parseBoundColumns
(
pCxt
,
&
dataBuf
->
boundColumnInfo
,
getTableColumnSchema
(
p
Cxt
->
pTable
Meta
)));
NEXT_TOKEN
(
pCxt
->
pSql
,
sToken
);
}
...
...
@@ -1182,7 +1179,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
memcpy
(
tags
,
&
pCxt
->
tags
,
sizeof
(
pCxt
->
tags
));
(
*
pCxt
->
pStmtCb
->
setInfoFn
)(
pCxt
->
pStmtCb
->
pStmt
,
pMeta
,
tags
,
tbFName
,
autoCreateTbl
,
pCxt
->
pVgroupsHashObj
,
(
*
pCxt
->
pStmtCb
->
setInfoFn
)(
pCxt
->
pStmtCb
->
pStmt
,
p
Cxt
->
pTable
Meta
,
tags
,
tbFName
,
autoCreateTbl
,
pCxt
->
pVgroupsHashObj
,
pCxt
->
pTableBlockHashObj
);
memset
(
&
pCxt
->
tags
,
0
,
sizeof
(
pCxt
->
tags
));
...
...
source/libs/scheduler/inc/schedulerInt.h
浏览文件 @
e27f8e8a
...
...
@@ -39,9 +39,14 @@ enum {
SCH_WRITE
,
};
enum
{
SCH_EXEC_CB
=
1
,
SCH_FETCH_CB
,
};
typedef
struct
SSchTrans
{
void
*
transInst
;
void
*
trans
Handle
;
void
*
pTrans
;
void
*
p
Handle
;
}
SSchTrans
;
typedef
struct
SSchHbTrans
{
...
...
@@ -74,12 +79,19 @@ typedef struct SSchJobStat {
}
SSchJobStat
;
typedef
struct
SSch
eduler
Stat
{
typedef
struct
SSchStat
{
SSchApiStat
api
;
SSchRuntimeStat
runtime
;
SSchJobStat
job
;
}
SSch
eduler
Stat
;
}
SSchStat
;
typedef
struct
SSchResInfo
{
SQueryResult
*
queryRes
;
void
**
fetchRes
;
schedulerExecCallback
execFp
;
schedulerFetchCallback
fetchFp
;
void
*
userParam
;
}
SSchResInfo
;
typedef
struct
SSchedulerMgmt
{
uint64_t
taskId
;
// sequential taksId
...
...
@@ -89,7 +101,7 @@ typedef struct SSchedulerMgmt {
bool
exit
;
int32_t
jobRef
;
int32_t
jobNum
;
SSch
edulerStat
stat
;
SSch
Stat
stat
;
SHashObj
*
hbConnections
;
}
SSchedulerMgmt
;
...
...
@@ -108,7 +120,7 @@ typedef struct SSchTaskCallbackParam {
typedef
struct
SSchHbCallbackParam
{
SSchCallbackParamHeader
head
;
SQueryNodeEpId
nodeEpId
;
void
*
transport
;
void
*
pTrans
;
}
SSchHbCallbackParam
;
typedef
struct
SSchFlowControl
{
...
...
@@ -170,7 +182,7 @@ typedef struct SSchJob {
SSchJobAttr
attr
;
int32_t
levelNum
;
int32_t
taskNum
;
void
*
transport
;
void
*
pTrans
;
SArray
*
nodeList
;
// qnode/vnode list, SArray<SQueryNodeAddr>
SArray
*
levels
;
// starting from 0. SArray<SSchLevel>
SNodeList
*
subPlans
;
// subplan pointer copied from DAG, no need to free it in scheduler
...
...
@@ -191,12 +203,13 @@ typedef struct SSchJob {
int32_t
remoteFetch
;
SSchTask
*
fetchTask
;
int32_t
errCode
;
SArray
*
errList
;
// SArray<SQueryErrorInfo>
SRWLatch
resLock
;
void
*
queryRes
;
void
*
resData
;
//TODO free it or not
int32_t
resNumOfRows
;
SSchResInfo
userRes
;
const
char
*
sql
;
int32_t
userCb
;
SQueryProfileSummary
summary
;
}
SSchJob
;
...
...
@@ -292,15 +305,21 @@ int32_t schUpdateTaskExecNodeHandle(SSchTask *pTask, void *handle, int32_t rspCo
void
schFreeRpcCtxVal
(
const
void
*
arg
);
int32_t
schMakeBrokenLinkVal
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SRpcBrokenlinkVal
*
brokenVal
,
bool
isHb
);
int32_t
schRecordTaskExecNode
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SQueryNodeAddr
*
addr
,
void
*
handle
);
int32_t
schExecStaticExplain
(
void
*
transport
,
SArray
*
pNodeList
,
SQueryPlan
*
pDag
,
int64_t
*
job
,
const
char
*
sql
,
bool
syncSchedule
);
int32_t
schExecJobImpl
(
void
*
transport
,
SArray
*
pNodeList
,
SQueryPlan
*
pDag
,
int64_t
*
job
,
const
char
*
sql
,
int64_t
startTs
,
bool
sync
);
int32_t
schExecStaticExplain
Job
(
void
*
pTrans
,
SArray
*
pNodeList
,
SQueryPlan
*
pDag
,
int64_t
*
job
,
const
char
*
sql
,
SSchResInfo
*
pRes
,
bool
sync
);
int32_t
schExecJobImpl
(
void
*
pTrans
,
SArray
*
pNodeList
,
SQueryPlan
*
pDag
,
int64_t
*
job
,
const
char
*
sql
,
SSchResInfo
*
pRes
,
int64_t
startTs
,
bool
sync
);
int32_t
schChkUpdateJobStatus
(
SSchJob
*
pJob
,
int8_t
newStatus
);
int32_t
schCancelJob
(
SSchJob
*
pJob
);
int32_t
schProcessOnJobDropped
(
SSchJob
*
pJob
,
int32_t
errCode
);
uint64_t
schGenTaskId
(
void
);
void
schCloseJobRef
(
void
);
int32_t
schExecJob
(
void
*
pTrans
,
SArray
*
pNodeList
,
SQueryPlan
*
pDag
,
int64_t
*
pJob
,
const
char
*
sql
,
int64_t
startTs
,
SSchResInfo
*
pRes
);
int32_t
schAsyncExecJob
(
void
*
pTrans
,
SArray
*
pNodeList
,
SQueryPlan
*
pDag
,
int64_t
*
pJob
,
const
char
*
sql
,
int64_t
startTs
,
SSchResInfo
*
pRes
);
int32_t
schFetchRows
(
SSchJob
*
pJob
);
int32_t
schAsyncFetchRows
(
SSchJob
*
pJob
);
#ifdef __cplusplus
...
...
source/libs/scheduler/src/schJob.c
浏览文件 @
e27f8e8a
...
...
@@ -39,8 +39,8 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *
return
TSDB_CODE_SUCCESS
;
}
int32_t
schInitJob
(
SSchJob
**
pSchJob
,
SQueryPlan
*
pDag
,
void
*
transport
,
SArray
*
pNodeList
,
const
char
*
sql
,
int64_t
startTs
,
bool
syncSchedule
)
{
int32_t
schInitJob
(
SSchJob
**
pSchJob
,
SQueryPlan
*
pDag
,
void
*
pTrans
,
SArray
*
pNodeList
,
const
char
*
sql
,
SSchResInfo
*
pRes
,
int64_t
startTs
,
bool
syncSchedule
)
{
int32_t
code
=
0
;
int64_t
refId
=
-
1
;
SSchJob
*
pJob
=
taosMemoryCalloc
(
1
,
sizeof
(
SSchJob
));
...
...
@@ -51,9 +51,12 @@ int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray
pJob
->
attr
.
explainMode
=
pDag
->
explainInfo
.
mode
;
pJob
->
attr
.
syncSchedule
=
syncSchedule
;
pJob
->
transport
=
transport
;
pJob
->
pTrans
=
pTrans
;
pJob
->
sql
=
sql
;
if
(
pRes
)
{
pJob
->
userRes
=
*
pRes
;
}
if
(
pNodeList
!=
NULL
)
{
pJob
->
nodeList
=
taosArrayDup
(
pNodeList
);
}
...
...
@@ -458,6 +461,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
SCH_ERR_JRET
(
schBuildTaskRalation
(
pJob
,
planToTask
));
_return:
if
(
planToTask
)
{
taosHashCleanup
(
planToTask
);
}
...
...
@@ -727,6 +731,69 @@ _return:
SCH_JOB_DLOG
(
"job errCode updated to %x - %s"
,
errCode
,
tstrerror
(
errCode
));
}
int32_t
schSetJobQueryRes
(
SSchJob
*
pJob
,
SQueryResult
*
pRes
)
{
pRes
->
code
=
atomic_load_32
(
&
pJob
->
errCode
);
pRes
->
numOfRows
=
pJob
->
resNumOfRows
;
pRes
->
res
=
pJob
->
queryRes
;
pJob
->
queryRes
=
NULL
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
schSetJobFetchRes
(
SSchJob
*
pJob
,
void
**
pData
)
{
int32_t
code
=
0
;
if
(
pJob
->
resData
&&
((
SRetrieveTableRsp
*
)
pJob
->
resData
)
->
completed
)
{
SCH_ERR_RET
(
schChkUpdateJobStatus
(
pJob
,
JOB_TASK_STATUS_SUCCEED
));
}
while
(
true
)
{
*
pData
=
atomic_load_ptr
(
&
pJob
->
resData
);
if
(
*
pData
!=
atomic_val_compare_exchange_ptr
(
&
pJob
->
resData
,
*
pData
,
NULL
))
{
continue
;
}
break
;
}
if
(
NULL
==
*
pData
)
{
SRetrieveTableRsp
*
rsp
=
(
SRetrieveTableRsp
*
)
taosMemoryCalloc
(
1
,
sizeof
(
SRetrieveTableRsp
));
if
(
rsp
)
{
rsp
->
completed
=
1
;
}
*
pData
=
rsp
;
SCH_JOB_DLOG
(
"empty res and set query complete, code:%x"
,
code
);
}
SCH_JOB_DLOG
(
"fetch done, totalRows:%d"
,
pJob
->
resNumOfRows
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
schNotifyUserQueryRes
(
SSchJob
*
pJob
)
{
pJob
->
userRes
.
queryRes
=
taosMemoryCalloc
(
1
,
sizeof
(
*
pJob
->
userRes
.
queryRes
));
if
(
pJob
->
userRes
.
queryRes
)
{
schSetJobQueryRes
(
pJob
,
pJob
->
userRes
.
queryRes
);
}
(
*
pJob
->
userRes
.
execFp
)(
pJob
->
userRes
.
queryRes
,
pJob
->
userRes
.
userParam
,
atomic_load_32
(
&
pJob
->
errCode
));
pJob
->
userRes
.
queryRes
=
NULL
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
schNotifyUserFetchRes
(
SSchJob
*
pJob
)
{
void
*
pRes
=
NULL
;
SCH_ERR_RET
(
schSetJobFetchRes
(
pJob
,
&
pRes
));
(
*
pJob
->
userRes
.
fetchFp
)(
pRes
,
pJob
->
userRes
.
userParam
,
atomic_load_32
(
&
pJob
->
errCode
));
return
TSDB_CODE_SUCCESS
;
}
int32_t
schProcessOnJobFailureImpl
(
SSchJob
*
pJob
,
int32_t
status
,
int32_t
errCode
)
{
// if already FAILED, no more processing
SCH_ERR_RET
(
schChkUpdateJobStatus
(
pJob
,
status
));
...
...
@@ -741,6 +808,14 @@ int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCod
SCH_JOB_DLOG
(
"job failed with error: %s"
,
tstrerror
(
code
));
if
(
!
pJob
->
attr
.
syncSchedule
)
{
if
(
SCH_EXEC_CB
==
atomic_val_compare_exchange_32
(
&
pJob
->
userCb
,
SCH_EXEC_CB
,
0
))
{
schNotifyUserQueryRes
(
pJob
);
}
else
if
(
SCH_FETCH_CB
==
atomic_val_compare_exchange_32
(
&
pJob
->
userCb
,
SCH_FETCH_CB
,
0
))
{
schNotifyUserFetchRes
(
pJob
);
}
}
SCH_RET
(
code
);
}
...
...
@@ -762,6 +837,10 @@ int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) {
if
(
pJob
->
attr
.
syncSchedule
)
{
tsem_post
(
&
pJob
->
rspSem
);
}
else
if
(
SCH_EXEC_CB
==
atomic_val_compare_exchange_32
(
&
pJob
->
userCb
,
SCH_EXEC_CB
,
0
))
{
schNotifyUserQueryRes
(
pJob
);
}
else
if
(
SCH_FETCH_CB
==
atomic_val_compare_exchange_32
(
&
pJob
->
userCb
,
SCH_FETCH_CB
,
0
))
{
schNotifyUserFetchRes
(
pJob
);
}
if
(
atomic_load_8
(
&
pJob
->
userFetch
))
{
...
...
@@ -1218,6 +1297,7 @@ void schFreeJobImpl(void *job) {
tFreeSSubmitRsp
((
SSubmitRsp
*
)
pJob
->
queryRes
);
}
taosMemoryFreeClear
(
pJob
->
userRes
.
queryRes
);
taosMemoryFreeClear
(
pJob
->
resData
);
taosMemoryFreeClear
(
pJob
);
...
...
@@ -1228,8 +1308,8 @@ void schFreeJobImpl(void *job) {
schCloseJobRef
();
}
int32_t
schExecJobImpl
(
void
*
transport
,
SArray
*
pNodeList
,
SQueryPlan
*
pDag
,
int64_t
*
job
,
const
char
*
sql
,
int64_t
startTs
,
bool
sync
)
{
int32_t
schExecJobImpl
(
void
*
pTrans
,
SArray
*
pNodeList
,
SQueryPlan
*
pDag
,
int64_t
*
job
,
const
char
*
sql
,
SSchResInfo
*
pRes
,
int64_t
startTs
,
bool
sync
)
{
qDebug
(
"QID:0x%"
PRIx64
" job started"
,
pDag
->
queryId
);
if
(
pNodeList
==
NULL
||
taosArrayGetSize
(
pNodeList
)
<=
0
)
{
...
...
@@ -1238,31 +1318,68 @@ int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pDag, int
int32_t
code
=
0
;
SSchJob
*
pJob
=
NULL
;
SCH_ERR_JRET
(
schInitJob
(
&
pJob
,
pDag
,
transport
,
pNodeList
,
sql
,
startTs
,
sync
));
SCH_ERR_JRET
(
schLaunchJob
(
pJob
));
SCH_ERR_RET
(
schInitJob
(
&
pJob
,
pDag
,
pTrans
,
pNodeList
,
sql
,
pRes
,
startTs
,
sync
));
*
job
=
pJob
->
refId
;
SCH_ERR_JRET
(
schLaunchJob
(
pJob
));
if
(
sync
)
{
SCH_JOB_DLOG
(
"will wait for rsp now, job status:%s"
,
SCH_GET_JOB_STATUS_STR
(
pJob
));
tsem_wait
(
&
pJob
->
rspSem
);
}
else
{
pJob
->
userCb
=
SCH_EXEC_CB
;
}
SCH_JOB_DLOG
(
"job exec done, job status:%s"
,
SCH_GET_JOB_STATUS_STR
(
pJob
));
_return:
schReleaseJob
(
pJob
->
refId
);
SCH_RET
(
code
);
}
return
TSDB_CODE_SUCCESS
;
int32_t
schExecJob
(
void
*
pTrans
,
SArray
*
pNodeList
,
SQueryPlan
*
pDag
,
int64_t
*
pJob
,
const
char
*
sql
,
int64_t
startTs
,
SSchResInfo
*
pRes
)
{
int32_t
code
=
0
;
*
pJob
=
0
;
if
(
EXPLAIN_MODE_STATIC
==
pDag
->
explainInfo
.
mode
)
{
SCH_ERR_JRET
(
schExecStaticExplainJob
(
pTrans
,
pNodeList
,
pDag
,
pJob
,
sql
,
NULL
,
true
));
}
else
{
SCH_ERR_JRET
(
schExecJobImpl
(
pTrans
,
pNodeList
,
pDag
,
pJob
,
sql
,
NULL
,
startTs
,
true
));
}
_return:
schFreeJobImpl
(
pJob
);
SCH_RET
(
code
);
if
(
*
pJob
)
{
SSchJob
*
job
=
schAcquireJob
(
*
pJob
);
schSetJobQueryRes
(
job
,
pRes
->
queryRes
);
schReleaseJob
(
*
pJob
);
}
return
code
;
}
int32_t
schAsyncExecJob
(
void
*
pTrans
,
SArray
*
pNodeList
,
SQueryPlan
*
pDag
,
int64_t
*
pJob
,
const
char
*
sql
,
int64_t
startTs
,
SSchResInfo
*
pRes
)
{
int32_t
code
=
0
;
*
pJob
=
0
;
if
(
EXPLAIN_MODE_STATIC
==
pDag
->
explainInfo
.
mode
)
{
SCH_ERR_RET
(
schExecStaticExplainJob
(
pTrans
,
pNodeList
,
pDag
,
pJob
,
sql
,
pRes
,
false
));
}
else
{
SCH_ERR_RET
(
schExecJobImpl
(
pTrans
,
pNodeList
,
pDag
,
pJob
,
sql
,
pRes
,
startTs
,
false
));
}
return
code
;
}
int32_t
schExecStaticExplain
(
void
*
transport
,
SArray
*
pNodeList
,
SQueryPlan
*
pDag
,
int64_t
*
job
,
const
char
*
sql
,
bool
syncSchedule
)
{
int32_t
schExecStaticExplain
Job
(
void
*
pTrans
,
SArray
*
pNodeList
,
SQueryPlan
*
pDag
,
int64_t
*
job
,
const
char
*
sql
,
SSchResInfo
*
pRes
,
bool
sync
)
{
qDebug
(
"QID:0x%"
PRIx64
" job started"
,
pDag
->
queryId
);
int32_t
code
=
0
;
...
...
@@ -1274,10 +1391,14 @@ int32_t schExecStaticExplain(void *transport, SArray *pNodeList, SQueryPlan *pDa
pJob
->
sql
=
sql
;
pJob
->
attr
.
queryJob
=
true
;
pJob
->
attr
.
syncSchedule
=
sync
;
pJob
->
attr
.
explainMode
=
pDag
->
explainInfo
.
mode
;
pJob
->
queryId
=
pDag
->
queryId
;
pJob
->
subPlans
=
pDag
->
pSubplans
;
if
(
pRes
)
{
pJob
->
userRes
=
*
pRes
;
}
SCH_ERR_JRET
(
qExecStaticExplain
(
pDag
,
(
SRetrieveTableRsp
**
)
&
pJob
->
resData
));
int64_t
refId
=
taosAddRef
(
schMgmt
.
jobRef
,
pJob
);
...
...
@@ -1288,7 +1409,7 @@ int32_t schExecStaticExplain(void *transport, SArray *pNodeList, SQueryPlan *pDa
if
(
NULL
==
schAcquireJob
(
refId
))
{
SCH_JOB_ELOG
(
"schAcquireJob job failed, refId:%"
PRIx64
,
refId
);
SCH_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
SCH_
ERR_
RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
pJob
->
refId
=
refId
;
...
...
@@ -1296,12 +1417,17 @@ int32_t schExecStaticExplain(void *transport, SArray *pNodeList, SQueryPlan *pDa
SCH_JOB_DLOG
(
"job refId:%"
PRIx64
,
pJob
->
refId
);
pJob
->
status
=
JOB_TASK_STATUS_PARTIAL_SUCCEED
;
*
job
=
pJob
->
refId
;
SCH_JOB_DLOG
(
"job exec done, job status:%s"
,
SCH_GET_JOB_STATUS_STR
(
pJob
));
if
(
!
pJob
->
attr
.
syncSchedule
)
{
code
=
schNotifyUserQueryRes
(
pJob
);
}
schReleaseJob
(
pJob
->
refId
);
return
TSDB_CODE_SUCCESS
;
SCH_RET
(
code
)
;
_return:
...
...
@@ -1309,4 +1435,103 @@ _return:
SCH_RET
(
code
);
}
int32_t
schFetchRows
(
SSchJob
*
pJob
)
{
int32_t
code
=
0
;
int8_t
status
=
SCH_GET_JOB_STATUS
(
pJob
);
if
(
status
==
JOB_TASK_STATUS_DROPPING
)
{
SCH_JOB_ELOG
(
"job is dropping, status:%s"
,
jobTaskStatusStr
(
status
));
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
if
(
!
SCH_JOB_NEED_FETCH
(
pJob
))
{
SCH_JOB_ELOG
(
"no need to fetch data, status:%s"
,
SCH_GET_JOB_STATUS_STR
(
pJob
));
SCH_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
}
if
(
atomic_val_compare_exchange_8
(
&
pJob
->
userFetch
,
0
,
1
)
!=
0
)
{
SCH_JOB_ELOG
(
"prior fetching not finished, userFetch:%d"
,
atomic_load_8
(
&
pJob
->
userFetch
));
SCH_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
}
if
(
JOB_TASK_STATUS_FAILED
==
status
||
JOB_TASK_STATUS_DROPPING
==
status
)
{
SCH_JOB_ELOG
(
"job failed or dropping, status:%s"
,
jobTaskStatusStr
(
status
));
SCH_ERR_JRET
(
atomic_load_32
(
&
pJob
->
errCode
));
}
else
if
(
status
==
JOB_TASK_STATUS_SUCCEED
)
{
SCH_JOB_DLOG
(
"job already succeed, status:%s"
,
jobTaskStatusStr
(
status
));
goto
_return
;
}
else
if
(
status
!=
JOB_TASK_STATUS_PARTIAL_SUCCEED
)
{
SCH_JOB_ELOG
(
"job status error for fetch, status:%s"
,
jobTaskStatusStr
(
status
));
SCH_ERR_JRET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
if
(
!
(
pJob
->
attr
.
explainMode
==
EXPLAIN_MODE_STATIC
))
{
SCH_ERR_JRET
(
schFetchFromRemote
(
pJob
));
tsem_wait
(
&
pJob
->
rspSem
);
status
=
SCH_GET_JOB_STATUS
(
pJob
);
if
(
JOB_TASK_STATUS_FAILED
==
status
||
JOB_TASK_STATUS_DROPPING
==
status
)
{
SCH_JOB_ELOG
(
"job failed or dropping, status:%s"
,
jobTaskStatusStr
(
status
));
SCH_ERR_JRET
(
atomic_load_32
(
&
pJob
->
errCode
));
}
}
SCH_ERR_JRET
(
schSetJobFetchRes
(
pJob
,
pJob
->
userRes
.
fetchRes
));
_return:
atomic_val_compare_exchange_8
(
&
pJob
->
userFetch
,
1
,
0
);
SCH_RET
(
code
);
}
int32_t
schAsyncFetchRows
(
SSchJob
*
pJob
)
{
int32_t
code
=
0
;
int8_t
status
=
SCH_GET_JOB_STATUS
(
pJob
);
if
(
status
==
JOB_TASK_STATUS_DROPPING
)
{
SCH_JOB_ELOG
(
"job is dropping, status:%s"
,
jobTaskStatusStr
(
status
));
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
if
(
!
SCH_JOB_NEED_FETCH
(
pJob
))
{
SCH_JOB_ELOG
(
"no need to fetch data, status:%s"
,
SCH_GET_JOB_STATUS_STR
(
pJob
));
SCH_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
}
if
(
atomic_val_compare_exchange_8
(
&
pJob
->
userFetch
,
0
,
1
)
!=
0
)
{
SCH_JOB_ELOG
(
"prior fetching not finished, userFetch:%d"
,
atomic_load_8
(
&
pJob
->
userFetch
));
SCH_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
}
if
(
JOB_TASK_STATUS_FAILED
==
status
||
JOB_TASK_STATUS_DROPPING
==
status
)
{
SCH_JOB_ELOG
(
"job failed or dropping, status:%s"
,
jobTaskStatusStr
(
status
));
SCH_ERR_JRET
(
atomic_load_32
(
&
pJob
->
errCode
));
}
else
if
(
status
==
JOB_TASK_STATUS_SUCCEED
)
{
SCH_JOB_DLOG
(
"job already succeed, status:%s"
,
jobTaskStatusStr
(
status
));
goto
_return
;
}
else
if
(
status
!=
JOB_TASK_STATUS_PARTIAL_SUCCEED
)
{
SCH_JOB_ELOG
(
"job status error for fetch, status:%s"
,
jobTaskStatusStr
(
status
));
SCH_ERR_JRET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
if
(
pJob
->
attr
.
explainMode
==
EXPLAIN_MODE_STATIC
)
{
SCH_ERR_JRET
(
schNotifyUserFetchRes
(
pJob
));
atomic_val_compare_exchange_8
(
&
pJob
->
userFetch
,
1
,
0
);
}
else
{
pJob
->
userCb
=
SCH_FETCH_CB
;
SCH_ERR_JRET
(
schFetchFromRemote
(
pJob
));
}
return
TSDB_CODE_SUCCESS
;
_return:
atomic_val_compare_exchange_8
(
&
pJob
->
userFetch
,
1
,
0
);
SCH_RET
(
code
);
}
source/libs/scheduler/src/schRemote.c
浏览文件 @
e27f8e8a
...
...
@@ -396,6 +396,7 @@ int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, in
SCH_ERR_JRET
(
schHandleResponseMsg
(
pJob
,
pTask
,
msgType
,
pMsg
->
pData
,
pMsg
->
len
,
rspCode
));
_return:
if
(
pJob
)
{
schReleaseJob
(
pParam
->
refId
);
}
...
...
@@ -450,7 +451,7 @@ int32_t schHandleLinkBrokenCallback(void *param, const SDataBuf *pMsg, int32_t c
if
(
head
->
isHbParam
)
{
SSchHbCallbackParam
*
hbParam
=
(
SSchHbCallbackParam
*
)
param
;
SSchTrans
trans
=
{.
transInst
=
hbParam
->
transport
,
.
trans
Handle
=
NULL
};
SSchTrans
trans
=
{.
pTrans
=
hbParam
->
pTrans
,
.
p
Handle
=
NULL
};
SCH_ERR_RET
(
schUpdateHbConnection
(
&
hbParam
->
nodeEpId
,
&
trans
));
SCH_ERR_RET
(
schBuildAndSendHbMsg
(
&
hbParam
->
nodeEpId
));
...
...
@@ -481,7 +482,7 @@ int32_t schGenerateCallBackInfo(SSchJob *pJob, SSchTask *pTask, int32_t msgType,
param
->
queryId
=
pJob
->
queryId
;
param
->
refId
=
pJob
->
refId
;
param
->
taskId
=
SCH_TASK_ID
(
pTask
);
param
->
transport
=
pJob
->
transport
;
param
->
transport
=
pJob
->
pTrans
;
msgSendInfo
->
param
=
param
;
msgSendInfo
->
fp
=
fp
;
...
...
@@ -556,7 +557,7 @@ int32_t schMakeHbCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) {
param
->
nodeEpId
.
nodeId
=
addr
->
nodeId
;
memcpy
(
&
param
->
nodeEpId
.
ep
,
SCH_GET_CUR_EP
(
addr
),
sizeof
(
SEp
));
param
->
transport
=
pJob
->
transport
;
param
->
pTrans
=
pJob
->
pTrans
;
*
pParam
=
param
;
...
...
@@ -639,7 +640,7 @@ int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
SCH_ERR_JRET
(
schGetCallbackFp
(
TDMT_VND_QUERY_HEARTBEAT
,
&
fp
));
param
->
nodeEpId
=
epId
;
param
->
transport
=
pJob
->
transport
;
param
->
pTrans
=
pJob
->
pTrans
;
pMsgSendInfo
->
param
=
param
;
pMsgSendInfo
->
fp
=
fp
;
...
...
@@ -668,7 +669,7 @@ int32_t schRegisterHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *
int32_t
code
=
0
;
SSchHbTrans
hb
=
{
0
};
hb
.
trans
.
transInst
=
pJob
->
transport
;
hb
.
trans
.
pTrans
=
pJob
->
pTrans
;
SCH_ERR_RET
(
schMakeHbRpcCtx
(
pJob
,
pTask
,
&
hb
.
rpcCtx
));
...
...
@@ -745,12 +746,12 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId) {
__async_send_cb_fn_t
fp
=
NULL
;
SCH_ERR_JRET
(
schGetCallbackFp
(
msgType
,
&
fp
));
param
->
transport
=
trans
.
transInst
;
param
->
transport
=
trans
.
pTrans
;
pMsgSendInfo
->
param
=
param
;
pMsgSendInfo
->
msgInfo
.
pData
=
msg
;
pMsgSendInfo
->
msgInfo
.
len
=
msgSize
;
pMsgSendInfo
->
msgInfo
.
handle
=
trans
.
trans
Handle
;
pMsgSendInfo
->
msgInfo
.
handle
=
trans
.
p
Handle
;
pMsgSendInfo
->
msgType
=
msgType
;
pMsgSendInfo
->
fp
=
fp
;
...
...
@@ -758,13 +759,13 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId) {
SEpSet
epSet
=
{.
inUse
=
0
,
.
numOfEps
=
1
};
memcpy
(
&
epSet
.
eps
[
0
],
&
nodeEpId
->
ep
,
sizeof
(
nodeEpId
->
ep
));
qDebug
(
"start to send hb msg,
instance:%p, handle:%p, fqdn:%s, port:%d"
,
trans
.
transInst
,
trans
.
trans
Handle
,
qDebug
(
"start to send hb msg,
pTrans:%p, pHandle:%p, fqdn:%s, port:%d"
,
trans
.
pTrans
,
trans
.
p
Handle
,
nodeEpId
->
ep
.
fqdn
,
nodeEpId
->
ep
.
port
);
code
=
asyncSendMsgToServerExt
(
trans
.
transInst
,
&
epSet
,
&
transporterId
,
pMsgSendInfo
,
true
,
&
rpcCtx
);
code
=
asyncSendMsgToServerExt
(
trans
.
pTrans
,
&
epSet
,
&
transporterId
,
pMsgSendInfo
,
true
,
&
rpcCtx
);
if
(
code
)
{
qError
(
"fail to send hb msg,
instance:%p, handle:%p, fqdn:%s, port:%d, error:%x - %s"
,
trans
.
transInst
,
trans
.
trans
Handle
,
nodeEpId
->
ep
.
fqdn
,
nodeEpId
->
ep
.
port
,
code
,
tstrerror
(
code
));
qError
(
"fail to send hb msg,
pTrans:%p, pHandle:%p, fqdn:%s, port:%d, error:%x - %s"
,
trans
.
pTrans
,
trans
.
p
Handle
,
nodeEpId
->
ep
.
fqdn
,
nodeEpId
->
ep
.
port
,
code
,
tstrerror
(
code
));
SCH_ERR_JRET
(
code
);
}
...
...
@@ -814,8 +815,8 @@ int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans) {
memcpy
(
&
hb
->
trans
,
trans
,
sizeof
(
*
trans
));
SCH_UNLOCK
(
SCH_WRITE
,
&
hb
->
lock
);
qDebug
(
"hb connection updated, sId:%"
PRIx64
", nodeId:%d, fqdn:%s, port:%d,
instance:%p, h
andle:%p"
,
schMgmt
.
sId
,
epId
->
nodeId
,
epId
->
ep
.
fqdn
,
epId
->
ep
.
port
,
trans
->
transInst
,
trans
->
trans
Handle
);
qDebug
(
"hb connection updated, sId:%"
PRIx64
", nodeId:%d, fqdn:%s, port:%d,
pTrans:%p, pH
andle:%p"
,
schMgmt
.
sId
,
epId
->
nodeId
,
epId
->
ep
.
fqdn
,
epId
->
ep
.
port
,
trans
->
pTrans
,
trans
->
p
Handle
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -835,8 +836,8 @@ int32_t schHandleHbCallback(void *param, const SDataBuf *pMsg, int32_t code) {
}
SSchTrans
trans
=
{
0
};
trans
.
transInst
=
pParam
->
transport
;
trans
.
trans
Handle
=
pMsg
->
handle
;
trans
.
pTrans
=
pParam
->
transport
;
trans
.
p
Handle
=
pMsg
->
handle
;
SCH_ERR_JRET
(
schUpdateHbConnection
(
&
rsp
.
epId
,
&
trans
));
...
...
@@ -881,7 +882,7 @@ int32_t schMakeCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) {
param
->
queryId
=
pJob
->
queryId
;
param
->
refId
=
pJob
->
refId
;
param
->
taskId
=
SCH_TASK_ID
(
pTask
);
param
->
transport
=
pJob
->
transport
;
param
->
transport
=
pJob
->
pTrans
;
*
pParam
=
param
;
...
...
@@ -1036,15 +1037,15 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, void *transport, SEpSet
pMsgSendInfo
->
msgInfo
.
pData
=
msg
;
pMsgSendInfo
->
msgInfo
.
len
=
msgSize
;
pMsgSendInfo
->
msgInfo
.
handle
=
trans
->
trans
Handle
;
pMsgSendInfo
->
msgInfo
.
handle
=
trans
->
p
Handle
;
pMsgSendInfo
->
msgType
=
msgType
;
qDebug
(
"start to send %s msg to node[%d,%s,%d], refId:%"
PRIx64
"
instance:%p, h
andle:%p"
,
TMSG_INFO
(
msgType
),
qDebug
(
"start to send %s msg to node[%d,%s,%d], refId:%"
PRIx64
"
pTrans:%p, pH
andle:%p"
,
TMSG_INFO
(
msgType
),
ntohl
(((
SMsgHead
*
)
msg
)
->
vgId
),
epSet
->
eps
[
epSet
->
inUse
].
fqdn
,
epSet
->
eps
[
epSet
->
inUse
].
port
,
pJob
->
refId
,
trans
->
transInst
,
trans
->
trans
Handle
);
trans
->
pTrans
,
trans
->
p
Handle
);
int64_t
transporterId
=
0
;
code
=
asyncSendMsgToServerExt
(
trans
->
transInst
,
epSet
,
&
transporterId
,
pMsgSendInfo
,
persistHandle
,
ctx
);
code
=
asyncSendMsgToServerExt
(
trans
->
pTrans
,
epSet
,
&
transporterId
,
pMsgSendInfo
,
persistHandle
,
ctx
);
if
(
code
)
{
SCH_ERR_JRET
(
code
);
}
...
...
@@ -1210,12 +1211,12 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
SCH_SET_TASK_LASTMSG_TYPE
(
pTask
,
msgType
);
SSchTrans
trans
=
{.
transInst
=
pJob
->
transport
,
.
trans
Handle
=
SCH_GET_TASK_HANDLE
(
pTask
)};
SSchTrans
trans
=
{.
pTrans
=
pJob
->
pTrans
,
.
p
Handle
=
SCH_GET_TASK_HANDLE
(
pTask
)};
SCH_ERR_JRET
(
schAsyncSendMsg
(
pJob
,
pTask
,
&
trans
,
&
epSet
,
msgType
,
msg
,
msgSize
,
persistHandle
,
(
rpcCtx
.
args
?
&
rpcCtx
:
NULL
)));
if
(
msgType
==
TDMT_VND_QUERY
)
{
SCH_ERR_RET
(
schRecordTaskExecNode
(
pJob
,
pTask
,
addr
,
trans
.
trans
Handle
));
SCH_ERR_RET
(
schRecordTaskExecNode
(
pJob
,
pTask
,
addr
,
trans
.
p
Handle
));
}
return
TSDB_CODE_SUCCESS
;
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
e27f8e8a
...
...
@@ -67,50 +67,24 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
schedulerExecJob
(
void
*
transport
,
SArray
*
n
odeList
,
SQueryPlan
*
pDag
,
int64_t
*
pJob
,
const
char
*
sql
,
int32_t
schedulerExecJob
(
void
*
pTrans
,
SArray
*
pN
odeList
,
SQueryPlan
*
pDag
,
int64_t
*
pJob
,
const
char
*
sql
,
int64_t
startTs
,
SQueryResult
*
pRes
)
{
if
(
NULL
==
transport
||
NULL
==
pDag
||
NULL
==
pDag
->
pSubplans
||
NULL
==
pJob
||
NULL
==
pRes
)
{
if
(
NULL
==
pTrans
||
NULL
==
pDag
||
NULL
==
pDag
->
pSubplans
||
NULL
==
pJob
||
NULL
==
pRes
)
{
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
int32_t
code
=
0
;
*
pJob
=
0
;
if
(
EXPLAIN_MODE_STATIC
==
pDag
->
explainInfo
.
mode
)
{
SCH_ERR_RET
(
schExecStaticExplain
(
transport
,
nodeList
,
pDag
,
pJob
,
sql
,
true
));
}
else
{
SCH_ERR_JRET
(
schExecJobImpl
(
transport
,
nodeList
,
pDag
,
pJob
,
sql
,
startTs
,
true
));
}
_return:
if
(
*
pJob
)
{
SSchJob
*
job
=
schAcquireJob
(
*
pJob
);
pRes
->
code
=
atomic_load_32
(
&
job
->
errCode
);
pRes
->
numOfRows
=
job
->
resNumOfRows
;
pRes
->
res
=
job
->
queryRes
;
job
->
queryRes
=
NULL
;
schReleaseJob
(
*
pJob
);
}
return
code
;
SSchResInfo
resInfo
=
{.
queryRes
=
pRes
};
SCH_RET
(
schExecJob
(
pTrans
,
pNodeList
,
pDag
,
pJob
,
sql
,
startTs
,
&
resInfo
));
}
int32_t
schedulerAsyncExecJob
(
void
*
transport
,
SArray
*
pNodeList
,
SQueryPlan
*
pDag
,
const
char
*
sql
,
int64_t
*
pJob
)
{
if
(
NULL
==
transport
||
NULL
==
pDag
||
NULL
==
pDag
->
pSubplans
||
NULL
==
pJob
)
{
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
if
(
EXPLAIN_MODE_STATIC
==
pDag
->
explainInfo
.
mode
)
{
SCH_ERR_RET
(
schExecStaticExplain
(
transport
,
pNodeList
,
pDag
,
pJob
,
sql
,
false
));
}
else
{
SCH_ERR_RET
(
schExecJobImpl
(
transport
,
pNodeList
,
pDag
,
pJob
,
sql
,
0
,
false
));
}
return
TSDB_CODE_SUCCESS
;
int32_t
schedulerAsyncExecJob
(
void
*
pTrans
,
SArray
*
pNodeList
,
SQueryPlan
*
pDag
,
int64_t
*
pJob
,
const
char
*
sql
,
int64_t
startTs
,
schedulerExecCallback
fp
,
void
*
param
)
{
if
(
NULL
==
pTrans
||
NULL
==
pDag
||
NULL
==
pDag
->
pSubplans
||
NULL
==
pJob
||
NULL
==
fp
||
NULL
==
param
)
{
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
SSchResInfo
resInfo
=
{.
execFp
=
fp
,
.
userParam
=
param
};
SCH_RET
(
schAsyncExecJob
(
pTrans
,
pNodeList
,
pDag
,
pJob
,
sql
,
startTs
,
&
resInfo
));
}
int32_t
schedulerFetchRows
(
int64_t
job
,
void
**
pData
)
{
...
...
@@ -125,76 +99,32 @@ int32_t schedulerFetchRows(int64_t job, void **pData) {
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
int8_t
status
=
SCH_GET_JOB_STATUS
(
pJob
);
if
(
status
==
JOB_TASK_STATUS_DROPPING
)
{
SCH_JOB_ELOG
(
"job is dropping, status:%s"
,
jobTaskStatusStr
(
status
));
schReleaseJob
(
job
);
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
if
(
!
SCH_JOB_NEED_FETCH
(
pJob
))
{
SCH_JOB_ELOG
(
"no need to fetch data, status:%s"
,
SCH_GET_JOB_STATUS_STR
(
pJob
));
schReleaseJob
(
job
);
SCH_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
}
if
(
atomic_val_compare_exchange_8
(
&
pJob
->
userFetch
,
0
,
1
)
!=
0
)
{
SCH_JOB_ELOG
(
"prior fetching not finished, userFetch:%d"
,
atomic_load_8
(
&
pJob
->
userFetch
));
schReleaseJob
(
job
);
SCH_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
}
if
(
JOB_TASK_STATUS_FAILED
==
status
||
JOB_TASK_STATUS_DROPPING
==
status
)
{
SCH_JOB_ELOG
(
"job failed or dropping, status:%s"
,
jobTaskStatusStr
(
status
));
SCH_ERR_JRET
(
atomic_load_32
(
&
pJob
->
errCode
));
}
else
if
(
status
==
JOB_TASK_STATUS_SUCCEED
)
{
SCH_JOB_DLOG
(
"job already succeed, status:%s"
,
jobTaskStatusStr
(
status
));
goto
_return
;
}
else
if
(
status
==
JOB_TASK_STATUS_PARTIAL_SUCCEED
)
{
if
(
!
(
pJob
->
attr
.
explainMode
==
EXPLAIN_MODE_STATIC
))
{
SCH_ERR_JRET
(
schFetchFromRemote
(
pJob
));
tsem_wait
(
&
pJob
->
rspSem
);
}
}
else
{
SCH_JOB_ELOG
(
"job status error for fetch, status:%s"
,
jobTaskStatusStr
(
status
));
SCH_ERR_JRET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
status
=
SCH_GET_JOB_STATUS
(
pJob
);
if
(
JOB_TASK_STATUS_FAILED
==
status
||
JOB_TASK_STATUS_DROPPING
==
status
)
{
SCH_JOB_ELOG
(
"job failed or dropping, status:%s"
,
jobTaskStatusStr
(
status
));
SCH_ERR_JRET
(
atomic_load_32
(
&
pJob
->
errCode
));
}
pJob
->
attr
.
syncSchedule
=
true
;
pJob
->
userRes
.
fetchRes
=
pData
;
code
=
schFetchRows
(
pJob
);
if
(
pJob
->
resData
&&
((
SRetrieveTableRsp
*
)
pJob
->
resData
)
->
completed
)
{
SCH_ERR_JRET
(
schChkUpdateJobStatus
(
pJob
,
JOB_TASK_STATUS_SUCCEED
));
}
schReleaseJob
(
job
);
while
(
true
)
{
*
pData
=
atomic_load_ptr
(
&
pJob
->
resData
);
if
(
*
pData
!=
atomic_val_compare_exchange_ptr
(
&
pJob
->
resData
,
*
pData
,
NULL
))
{
continue
;
}
SCH_RET
(
code
);
}
break
;
int32_t
schedulerAsyncFetchRows
(
int64_t
job
,
schedulerFetchCallback
fp
,
void
*
param
)
{
if
(
NULL
==
fp
||
NULL
==
param
)
{
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
if
(
NULL
==
*
pData
)
{
SRetrieveTableRsp
*
rsp
=
(
SRetrieveTableRsp
*
)
taosMemoryCalloc
(
1
,
sizeof
(
SRetrieveTableRsp
));
if
(
rsp
)
{
rsp
->
completed
=
1
;
}
*
pData
=
rsp
;
SCH_JOB_DLOG
(
"empty res and set query complete, code:%x"
,
code
);
int32_t
code
=
0
;
SSchJob
*
pJob
=
schAcquireJob
(
job
);
if
(
NULL
==
pJob
)
{
qError
(
"acquire job from jobRef list failed, may be dropped, refId:%"
PRIx64
,
job
);
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
SCH_JOB_DLOG
(
"fetch done, totalRows:%d, code:%s"
,
pJob
->
resNumOfRows
,
tstrerror
(
code
))
;
_return:
atomic_val_compare_exchange_8
(
&
pJob
->
userFetch
,
1
,
0
);
pJob
->
attr
.
syncSchedule
=
false
;
pJob
->
userRes
.
fetchFp
=
fp
;
pJob
->
userRes
.
userParam
=
param
;
code
=
schFetchRows
(
pJob
);
schReleaseJob
(
job
);
...
...
source/libs/scheduler/test/schedulerTests.cpp
浏览文件 @
e27f8e8a
...
...
@@ -87,6 +87,11 @@ void schtInitLogFile() {
}
void
schtQueryCb
(
SQueryResult
*
pResult
,
void
*
param
,
int32_t
code
)
{
assert
(
TSDB_CODE_SUCCESS
==
code
);
*
(
int32_t
*
)
param
=
1
;
}
void
schtBuildQueryDag
(
SQueryPlan
*
dag
)
{
uint64_t
qId
=
schtQueryId
;
...
...
@@ -485,6 +490,7 @@ void* schtRunJobThread(void *aa) {
SHashObj
*
execTasks
=
NULL
;
SDataBuf
dataBuf
=
{
0
};
uint32_t
jobFinished
=
0
;
int32_t
queryDone
=
0
;
while
(
!
schtTestStop
)
{
schtBuildQueryDag
(
&
dag
);
...
...
@@ -496,7 +502,8 @@ void* schtRunJobThread(void *aa) {
qnodeAddr
.
port
=
6031
;
taosArrayPush
(
qnodeList
,
&
qnodeAddr
);
code
=
schedulerAsyncExecJob
(
mockPointer
,
qnodeList
,
&
dag
,
"select * from tb"
,
&
queryJobRefId
);
queryDone
=
0
;
code
=
schedulerAsyncExecJob
(
mockPointer
,
qnodeList
,
&
dag
,
&
queryJobRefId
,
"select * from tb"
,
0
,
schtQueryCb
,
&
queryDone
);
assert
(
code
==
0
);
pJob
=
schAcquireJob
(
queryJobRefId
);
...
...
@@ -595,6 +602,14 @@ void* schtRunJobThread(void *aa) {
pIter
=
taosHashIterate
(
execTasks
,
pIter
);
}
while
(
true
)
{
if
(
queryDone
)
{
break
;
}
taosUsleep
(
10000
);
}
atomic_store_32
(
&
schtStartFetch
,
1
);
void
*
data
=
NULL
;
...
...
@@ -667,8 +682,9 @@ TEST(queryTest, normalCase) {
schtSetPlanToString
();
schtSetExecNode
();
schtSetAsyncSendMsgToServer
();
code
=
schedulerAsyncExecJob
(
mockPointer
,
qnodeList
,
&
dag
,
"select * from tb"
,
&
job
);
int32_t
queryDone
=
0
;
code
=
schedulerAsyncExecJob
(
mockPointer
,
qnodeList
,
&
dag
,
&
job
,
"select * from tb"
,
0
,
schtQueryCb
,
&
queryDone
);
ASSERT_EQ
(
code
,
0
);
...
...
@@ -718,6 +734,14 @@ TEST(queryTest, normalCase) {
pIter
=
taosHashIterate
(
pJob
->
execTasks
,
pIter
);
}
while
(
true
)
{
if
(
queryDone
)
{
break
;
}
taosUsleep
(
10000
);
}
TdThreadAttr
thattr
;
taosThreadAttrInit
(
&
thattr
);
...
...
@@ -773,8 +797,9 @@ TEST(queryTest, readyFirstCase) {
schtSetPlanToString
();
schtSetExecNode
();
schtSetAsyncSendMsgToServer
();
code
=
schedulerAsyncExecJob
(
mockPointer
,
qnodeList
,
&
dag
,
"select * from tb"
,
&
job
);
int32_t
queryDone
=
0
;
code
=
schedulerAsyncExecJob
(
mockPointer
,
qnodeList
,
&
dag
,
&
job
,
"select * from tb"
,
0
,
schtQueryCb
,
&
queryDone
);
ASSERT_EQ
(
code
,
0
);
...
...
@@ -824,6 +849,13 @@ TEST(queryTest, readyFirstCase) {
pIter
=
taosHashIterate
(
pJob
->
execTasks
,
pIter
);
}
while
(
true
)
{
if
(
queryDone
)
{
break
;
}
taosUsleep
(
10000
);
}
TdThreadAttr
thattr
;
...
...
@@ -885,16 +917,17 @@ TEST(queryTest, flowCtrlCase) {
schtSetPlanToString
();
schtSetExecNode
();
schtSetAsyncSendMsgToServer
();
code
=
schedulerAsyncExecJob
(
mockPointer
,
qnodeList
,
&
dag
,
"select * from tb"
,
&
job
);
int32_t
queryDone
=
0
;
code
=
schedulerAsyncExecJob
(
mockPointer
,
qnodeList
,
&
dag
,
&
job
,
"select * from tb"
,
0
,
schtQueryCb
,
&
queryDone
);
ASSERT_EQ
(
code
,
0
);
SSchJob
*
pJob
=
schAcquireJob
(
job
);
bool
q
uery
Done
=
false
;
bool
qDone
=
false
;
while
(
!
q
uery
Done
)
{
while
(
!
qDone
)
{
void
*
pIter
=
taosHashIterate
(
pJob
->
execTasks
,
NULL
);
if
(
NULL
==
pIter
)
{
break
;
...
...
@@ -915,7 +948,7 @@ TEST(queryTest, flowCtrlCase) {
code
=
schHandleResponseMsg
(
pJob
,
task
,
TDMT_VND_RES_READY_RSP
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
ASSERT_EQ
(
code
,
0
);
}
else
{
q
uery
Done
=
true
;
qDone
=
true
;
break
;
}
...
...
@@ -923,6 +956,13 @@ TEST(queryTest, flowCtrlCase) {
}
}
while
(
true
)
{
if
(
queryDone
)
{
break
;
}
taosUsleep
(
10000
);
}
TdThreadAttr
thattr
;
taosThreadAttrInit
(
&
thattr
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录