Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
f126e1e3
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
f126e1e3
编写于
10月 13, 2022
作者:
H
Hongze Cheng
提交者:
GitHub
10月 13, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #17340 from taosdata/refact/code_format
refact: code format
上级
0ad70770
a014be5c
变更
12
展开全部
隐藏空白更改
内联
并排
Showing
12 changed file
with
740 addition
and
751 deletion
+740
-751
source/dnode/vnode/src/tsdb/tsdbMergeTree.c
source/dnode/vnode/src/tsdb/tsdbMergeTree.c
+11
-9
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+1
-1
source/libs/scheduler/inc/schInt.h
source/libs/scheduler/inc/schInt.h
+348
-295
source/libs/scheduler/src/schDbg.c
source/libs/scheduler/src/schDbg.c
+2
-4
source/libs/scheduler/src/schFlowCtrl.c
source/libs/scheduler/src/schFlowCtrl.c
+60
-61
source/libs/scheduler/src/schJob.c
source/libs/scheduler/src/schJob.c
+69
-82
source/libs/scheduler/src/schStatus.c
source/libs/scheduler/src/schStatus.c
+7
-9
source/libs/scheduler/src/schTask.c
source/libs/scheduler/src/schTask.c
+34
-29
source/libs/scheduler/src/schUtil.c
source/libs/scheduler/src/schUtil.c
+23
-30
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+6
-6
source/libs/scheduler/test/schedulerTests.cpp
source/libs/scheduler/test/schedulerTests.cpp
+177
-205
tools/scripts/codeFormat.sh
tools/scripts/codeFormat.sh
+2
-20
未找到文件。
source/dnode/vnode/src/tsdb/tsdbMergeTree.c
浏览文件 @
f126e1e3
...
...
@@ -101,8 +101,8 @@ void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
static
SBlockData
*
loadLastBlock
(
SLDataIter
*
pIter
,
const
char
*
idStr
)
{
int32_t
code
=
0
;
SSttBlockLoadInfo
*
pInfo
=
pIter
->
pBlockLoadInfo
;
if
(
pInfo
->
blockIndex
[
0
]
==
pIter
->
iSttBlk
)
{
SSttBlockLoadInfo
*
pInfo
=
pIter
->
pBlockLoadInfo
;
if
(
pInfo
->
blockIndex
[
0
]
==
pIter
->
iSttBlk
)
{
if
(
pInfo
->
currentLoadBlockIndex
!=
0
)
{
tsdbDebug
(
"current load index is set to 0, block index:%d, file index:%d, due to uid:%"
PRIu64
", load data, %s"
,
pIter
->
iSttBlk
,
pIter
->
iStt
,
pIter
->
uid
,
idStr
);
...
...
@@ -113,7 +113,7 @@ static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) {
if
(
pInfo
->
blockIndex
[
1
]
==
pIter
->
iSttBlk
)
{
if
(
pInfo
->
currentLoadBlockIndex
!=
1
)
{
tsdbDebug
(
"current load index is set to 1, block index:%d, file index:%d, due to uid:%"
PRIu64
", load data, %s"
,
tsdbDebug
(
"current load index is set to 1, block index:%d, file index:%d, due to uid:%"
PRIu64
", load data, %s"
,
pIter
->
iSttBlk
,
pIter
->
iStt
,
pIter
->
uid
,
idStr
);
pInfo
->
currentLoadBlockIndex
=
1
;
}
...
...
@@ -140,8 +140,10 @@ static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) {
pInfo
->
elapsedTime
+=
el
;
pInfo
->
loadBlocks
+=
1
;
tsdbDebug
(
"read last block, total load:%d, trigger by uid:%"
PRIu64
", last file index:%d, last block index:%d, entry:%d, %p, elapsed time:%.2f ms, %s"
,
pInfo
->
loadBlocks
,
pIter
->
uid
,
pIter
->
iStt
,
pIter
->
iSttBlk
,
pInfo
->
currentLoadBlockIndex
,
pBlock
,
el
,
idStr
);
tsdbDebug
(
"read last block, total load:%d, trigger by uid:%"
PRIu64
", last file index:%d, last block index:%d, entry:%d, %p, elapsed time:%.2f ms, %s"
,
pInfo
->
loadBlocks
,
pIter
->
uid
,
pIter
->
iStt
,
pIter
->
iSttBlk
,
pInfo
->
currentLoadBlockIndex
,
pBlock
,
el
,
idStr
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_exit
;
}
...
...
@@ -336,7 +338,7 @@ _exit:
void
tLDataIterClose
(
SLDataIter
*
pIter
)
{
taosMemoryFree
(
pIter
);
}
void
tLDataIterNextBlock
(
SLDataIter
*
pIter
,
const
char
*
idStr
)
{
void
tLDataIterNextBlock
(
SLDataIter
*
pIter
,
const
char
*
idStr
)
{
int32_t
step
=
pIter
->
backward
?
-
1
:
1
;
int32_t
oldIndex
=
pIter
->
iSttBlk
;
...
...
@@ -386,10 +388,10 @@ void tLDataIterNextBlock(SLDataIter *pIter, const char* idStr) {
if
(
index
!=
-
1
)
{
pIter
->
iSttBlk
=
index
;
pIter
->
pSttBlk
=
(
SSttBlk
*
)
taosArrayGet
(
pIter
->
pBlockLoadInfo
->
aSttBlk
,
pIter
->
iSttBlk
);
tsdbDebug
(
"try next last file block:%d from %d, trigger by uid:%"
PRIu64
", file index:%d, %s"
,
pIter
->
iSttBlk
,
oldIndex
,
pIter
->
uid
,
pIter
->
iStt
,
idStr
);
tsdbDebug
(
"try next last file block:%d from %d, trigger by uid:%"
PRIu64
", file index:%d, %s"
,
pIter
->
iSttBlk
,
oldIndex
,
pIter
->
uid
,
pIter
->
iStt
,
idStr
);
}
else
{
tsdbDebug
(
"no more last block qualified, uid:%"
PRIu64
", file index::%d, %s"
,
pIter
->
uid
,
oldIndex
,
idStr
);
tsdbDebug
(
"no more last block qualified, uid:%"
PRIu64
", file index::%d, %s"
,
pIter
->
uid
,
oldIndex
,
idStr
);
}
}
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
f126e1e3
...
...
@@ -1645,7 +1645,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
STSRow
*
pTSRow
=
NULL
;
SRowMerger
merge
=
{
0
};
TSDBROW
fRow
=
tMergeTreeGetRow
(
&
pLastBlockReader
->
mergeTree
);
tsdbTrace
(
"fRow ptr:%p, %d, uid:%"
PRIu64
", %s"
,
fRow
.
pBlockData
,
fRow
.
iRow
,
pLastBlockReader
->
uid
,
pReader
->
idStr
);
tsdbTrace
(
"fRow ptr:%p, %d, uid:%"
PRIu64
", %s"
,
fRow
.
pBlockData
,
fRow
.
iRow
,
pLastBlockReader
->
uid
,
pReader
->
idStr
);
// only last block exists
if
((
!
mergeBlockData
)
||
(
tsLastBlock
!=
pBlockData
->
aTSKEY
[
pDumpInfo
->
rowIndex
]))
{
...
...
source/libs/scheduler/inc/schInt.h
浏览文件 @
f126e1e3
此差异已折叠。
点击以展开。
source/libs/scheduler/src/schDbg.c
浏览文件 @
f126e1e3
...
...
@@ -16,19 +16,17 @@
#include "query.h"
#include "schInt.h"
tsem_t
schdRspSem
;
tsem_t
schdRspSem
;
SSchDebug
gSCHDebug
=
{
0
};
void
schdExecCallback
(
SExecResult
*
pResult
,
void
*
param
,
int32_t
code
)
{
if
(
code
)
{
pResult
->
code
=
code
;
}
*
(
SExecResult
*
)
param
=
*
pResult
;
taosMemoryFree
(
pResult
);
tsem_post
(
&
schdRspSem
);
}
source/libs/scheduler/src/schFlowCtrl.c
浏览文件 @
f126e1e3
...
...
@@ -13,10 +13,10 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "catalog.h"
#include "query.h"
#include "schInt.h"
#include "tmsg.h"
#include "query.h"
#include "catalog.h"
#include "tref.h"
void
schFreeFlowCtrl
(
SSchJob
*
pJob
)
{
...
...
@@ -25,14 +25,14 @@ void schFreeFlowCtrl(SSchJob *pJob) {
}
SSchFlowControl
*
ctrl
=
NULL
;
void
*
pIter
=
taosHashIterate
(
pJob
->
flowCtrl
,
NULL
);
void
*
pIter
=
taosHashIterate
(
pJob
->
flowCtrl
,
NULL
);
while
(
pIter
)
{
ctrl
=
(
SSchFlowControl
*
)
pIter
;
if
(
ctrl
->
taskList
)
{
taosArrayDestroy
(
ctrl
->
taskList
);
}
pIter
=
taosHashIterate
(
pJob
->
flowCtrl
,
pIter
);
}
...
...
@@ -59,7 +59,8 @@ int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) {
return
TSDB_CODE_SUCCESS
;
}
pJob
->
flowCtrl
=
taosHashInit
(
pJob
->
taskNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
,
HASH_ENTRY_LOCK
);
pJob
->
flowCtrl
=
taosHashInit
(
pJob
->
taskNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
pJob
->
flowCtrl
)
{
SCH_JOB_ELOG
(
"taosHashInit %d flowCtrl failed"
,
pJob
->
taskNum
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
...
...
@@ -73,17 +74,17 @@ int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) {
}
int32_t
schDecTaskFlowQuota
(
SSchJob
*
pJob
,
SSchTask
*
pTask
)
{
SSchLevel
*
pLevel
=
pTask
->
level
;
SSchLevel
*
pLevel
=
pTask
->
level
;
SSchFlowControl
*
ctrl
=
NULL
;
int32_t
code
=
0
;
SEp
*
ep
=
SCH_GET_CUR_EP
(
&
pTask
->
plan
->
execNode
);
int32_t
code
=
0
;
SEp
*
ep
=
SCH_GET_CUR_EP
(
&
pTask
->
plan
->
execNode
);
ctrl
=
(
SSchFlowControl
*
)
taosHashGet
(
pJob
->
flowCtrl
,
ep
,
sizeof
(
SEp
));
if
(
NULL
==
ctrl
)
{
SCH_TASK_ELOG
(
"taosHashGet node from flowCtrl failed, fqdn:%s, port:%d"
,
ep
->
fqdn
,
ep
->
port
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
SCH_LOCK
(
SCH_WRITE
,
&
ctrl
->
lock
);
if
(
ctrl
->
execTaskNum
<=
0
)
{
SCH_TASK_ELOG
(
"taosHashGet node from flowCtrl failed, fqdn:%s, port:%d"
,
ep
->
fqdn
,
ep
->
port
);
...
...
@@ -93,8 +94,8 @@ int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask) {
--
ctrl
->
execTaskNum
;
ctrl
->
tableNumSum
-=
pTask
->
plan
->
execNodeStat
.
tableNum
;
SCH_TASK_DLOG
(
"task quota removed, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d"
,
ep
->
fqdn
,
ep
->
port
,
pTask
->
plan
->
execNodeStat
.
tableNum
,
ctrl
->
tableNumSum
,
ctrl
->
execTaskNum
);
SCH_TASK_DLOG
(
"task quota removed, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d"
,
ep
->
fqdn
,
ep
->
port
,
pTask
->
plan
->
execNodeStat
.
tableNum
,
ctrl
->
tableNumSum
,
ctrl
->
execTaskNum
);
_return:
...
...
@@ -104,11 +105,11 @@ _return:
}
int32_t
schCheckIncTaskFlowQuota
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
bool
*
enough
)
{
SSchLevel
*
pLevel
=
pTask
->
level
;
int32_t
code
=
0
;
SSchLevel
*
pLevel
=
pTask
->
level
;
int32_t
code
=
0
;
SSchFlowControl
*
ctrl
=
NULL
;
SEp
*
ep
=
SCH_GET_CUR_EP
(
&
pTask
->
plan
->
execNode
);
SEp
*
ep
=
SCH_GET_CUR_EP
(
&
pTask
->
plan
->
execNode
);
do
{
ctrl
=
(
SSchFlowControl
*
)
taosHashGet
(
pJob
->
flowCtrl
,
ep
,
sizeof
(
SEp
));
if
(
NULL
==
ctrl
)
{
...
...
@@ -119,34 +120,34 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) {
if
(
HASH_NODE_EXIST
(
code
))
{
continue
;
}
SCH_TASK_ELOG
(
"taosHashPut flowCtrl failed, size:%d"
,
(
int32_t
)
sizeof
(
nctrl
));
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
SCH_TASK_DLOG
(
"task quota added, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d"
,
ep
->
fqdn
,
ep
->
port
,
pTask
->
plan
->
execNodeStat
.
tableNum
,
nctrl
.
tableNumSum
,
nctrl
.
execTaskNum
);
SCH_TASK_DLOG
(
"task quota added, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d"
,
ep
->
fqdn
,
ep
->
port
,
pTask
->
plan
->
execNodeStat
.
tableNum
,
nctrl
.
tableNumSum
,
nctrl
.
execTaskNum
);
*
enough
=
true
;
return
TSDB_CODE_SUCCESS
;
}
SCH_LOCK
(
SCH_WRITE
,
&
ctrl
->
lock
);
if
(
0
==
ctrl
->
execTaskNum
)
{
ctrl
->
tableNumSum
=
pTask
->
plan
->
execNodeStat
.
tableNum
;
++
ctrl
->
execTaskNum
;
*
enough
=
true
;
break
;
}
int32_t
sum
=
pTask
->
plan
->
execNodeStat
.
tableNum
+
ctrl
->
tableNumSum
;
if
(
sum
<=
schMgmt
.
cfg
.
maxNodeTableNum
)
{
ctrl
->
tableNumSum
=
sum
;
++
ctrl
->
execTaskNum
;
*
enough
=
true
;
break
;
}
...
...
@@ -166,24 +167,25 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) {
*
enough
=
false
;
ctrl
->
sorted
=
false
;
break
;
}
while
(
true
);
_return:
SCH_TASK_DLOG
(
"task quota %s added, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d"
,
((
*
enough
)
?
""
:
"NOT"
),
ep
->
fqdn
,
ep
->
port
,
pTask
->
plan
->
execNodeStat
.
tableNum
,
ctrl
->
tableNumSum
,
ctrl
->
execTaskNum
);
SCH_TASK_DLOG
(
"task quota %s added, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d"
,
((
*
enough
)
?
""
:
"NOT"
),
ep
->
fqdn
,
ep
->
port
,
pTask
->
plan
->
execNodeStat
.
tableNum
,
ctrl
->
tableNumSum
,
ctrl
->
execTaskNum
);
SCH_UNLOCK
(
SCH_WRITE
,
&
ctrl
->
lock
);
SCH_RET
(
code
);
}
int32_t
schTaskTableNumCompare
(
const
void
*
key1
,
const
void
*
key2
)
{
int32_t
schTaskTableNumCompare
(
const
void
*
key1
,
const
void
*
key2
)
{
SSchTask
*
pTask1
=
*
(
SSchTask
**
)
key1
;
SSchTask
*
pTask2
=
*
(
SSchTask
**
)
key2
;
if
(
pTask1
->
plan
->
execNodeStat
.
tableNum
<
pTask2
->
plan
->
execNodeStat
.
tableNum
)
{
return
1
;
}
else
if
(
pTask1
->
plan
->
execNodeStat
.
tableNum
>
pTask2
->
plan
->
execNodeStat
.
tableNum
)
{
...
...
@@ -193,22 +195,21 @@ int32_t schTaskTableNumCompare(const void* key1, const void* key2) {
}
}
int32_t
schLaunchTasksInFlowCtrlListImpl
(
SSchJob
*
pJob
,
SSchFlowControl
*
ctrl
)
{
SCH_LOCK
(
SCH_WRITE
,
&
ctrl
->
lock
);
if
(
NULL
==
ctrl
->
taskList
||
taosArrayGetSize
(
ctrl
->
taskList
)
<=
0
)
{
SCH_UNLOCK
(
SCH_WRITE
,
&
ctrl
->
lock
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
remainNum
=
schMgmt
.
cfg
.
maxNodeTableNum
-
ctrl
->
tableNumSum
;
int32_t
taskNum
=
taosArrayGetSize
(
ctrl
->
taskList
);
int32_t
code
=
0
;
int32_t
remainNum
=
schMgmt
.
cfg
.
maxNodeTableNum
-
ctrl
->
tableNumSum
;
int32_t
taskNum
=
taosArrayGetSize
(
ctrl
->
taskList
);
int32_t
code
=
0
;
SSchTask
*
pTask
=
NULL
;
if
(
taskNum
>
1
&&
!
ctrl
->
sorted
)
{
taosArraySort
(
ctrl
->
taskList
,
schTaskTableNumCompare
);
// desc order
taosArraySort
(
ctrl
->
taskList
,
schTaskTableNumCompare
);
// desc order
}
for
(
int32_t
i
=
0
;
i
<
taskNum
;
++
i
)
{
...
...
@@ -216,36 +217,36 @@ int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) {
SEp
*
ep
=
SCH_GET_CUR_EP
(
&
pTask
->
plan
->
execNode
);
if
(
pTask
->
plan
->
execNodeStat
.
tableNum
>
remainNum
&&
ctrl
->
execTaskNum
>
0
)
{
SCH_TASK_DLOG
(
"task NOT to launch, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d"
,
ep
->
fqdn
,
ep
->
port
,
pTask
->
plan
->
execNodeStat
.
tableNum
,
ctrl
->
tableNumSum
,
ctrl
->
execTaskNum
);
SCH_TASK_DLOG
(
"task NOT to launch, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d"
,
ep
->
fqdn
,
ep
->
port
,
pTask
->
plan
->
execNodeStat
.
tableNum
,
ctrl
->
tableNumSum
,
ctrl
->
execTaskNum
);
continue
;
}
ctrl
->
tableNumSum
+=
pTask
->
plan
->
execNodeStat
.
tableNum
;
++
ctrl
->
execTaskNum
;
taosArrayRemove
(
ctrl
->
taskList
,
i
);
SCH_TASK_DLOG
(
"task to launch, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d"
,
ep
->
fqdn
,
ep
->
port
,
pTask
->
plan
->
execNodeStat
.
tableNum
,
ctrl
->
tableNumSum
,
ctrl
->
execTaskNum
);
SCH_TASK_DLOG
(
"task to launch, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d"
,
ep
->
fqdn
,
ep
->
port
,
pTask
->
plan
->
execNodeStat
.
tableNum
,
ctrl
->
tableNumSum
,
ctrl
->
execTaskNum
);
SCH_ERR_JRET
(
schAsyncLaunchTaskImpl
(
pJob
,
pTask
));
remainNum
-=
pTask
->
plan
->
execNodeStat
.
tableNum
;
if
(
remainNum
<=
0
)
{
SCH_TASK_DLOG
(
"no more task to launch, fqdn:%s, port:%d, remainNum:%d, remainExecTaskNum:%d"
,
ep
->
fqdn
,
ep
->
port
,
ctrl
->
tableNumSum
,
ctrl
->
execTaskNum
);
SCH_TASK_DLOG
(
"no more task to launch, fqdn:%s, port:%d, remainNum:%d, remainExecTaskNum:%d"
,
ep
->
fqdn
,
ep
->
port
,
ctrl
->
tableNumSum
,
ctrl
->
execTaskNum
);
break
;
}
if
(
i
<
(
taskNum
-
1
))
{
SSchTask
*
pLastTask
=
*
(
SSchTask
**
)
taosArrayGetLast
(
ctrl
->
taskList
);
if
(
remainNum
<
pLastTask
->
plan
->
execNodeStat
.
tableNum
)
{
SCH_TASK_DLOG
(
"no more task to launch, fqdn:%s, port:%d, remainNum:%d, remainExecTaskNum:%d, smallestInList:%d"
,
ep
->
fqdn
,
ep
->
port
,
ctrl
->
tableNumSum
,
ctrl
->
execTaskNum
,
pLastTask
->
plan
->
execNodeStat
.
tableNum
);
SCH_TASK_DLOG
(
"no more task to launch, fqdn:%s, port:%d, remainNum:%d, remainExecTaskNum:%d, smallestInList:%d"
,
ep
->
fqdn
,
ep
->
port
,
ctrl
->
tableNumSum
,
ctrl
->
execTaskNum
,
pLastTask
->
plan
->
execNodeStat
.
tableNum
);
break
;
}
}
...
...
@@ -253,7 +254,7 @@ int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) {
--
i
;
--
taskNum
;
}
_return:
SCH_UNLOCK
(
SCH_WRITE
,
&
ctrl
->
lock
);
...
...
@@ -261,11 +262,10 @@ _return:
if
(
code
)
{
code
=
schProcessOnTaskFailure
(
pJob
,
pTask
,
code
);
}
SCH_RET
(
code
);
}
int32_t
schLaunchTasksInFlowCtrlList
(
SSchJob
*
pJob
,
SSchTask
*
pTask
)
{
if
(
!
SCH_TASK_NEED_FLOW_CTRL
(
pJob
,
pTask
))
{
return
TSDB_CODE_SUCCESS
;
...
...
@@ -274,17 +274,16 @@ int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask) {
SCH_ERR_RET
(
schDecTaskFlowQuota
(
pJob
,
pTask
));
SEp
*
ep
=
SCH_GET_CUR_EP
(
&
pTask
->
plan
->
execNode
);
SSchFlowControl
*
ctrl
=
(
SSchFlowControl
*
)
taosHashGet
(
pJob
->
flowCtrl
,
ep
,
sizeof
(
SEp
));
if
(
NULL
==
ctrl
)
{
SCH_TASK_ELOG
(
"taosHashGet node from flowCtrl failed, fqdn:%s, port:%d"
,
ep
->
fqdn
,
ep
->
port
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
int32_t
code
=
schLaunchTasksInFlowCtrlListImpl
(
pJob
,
ctrl
);;
SCH_ERR_RET
(
code
);
return
code
;
// to avoid compiler error
}
int32_t
code
=
schLaunchTasksInFlowCtrlListImpl
(
pJob
,
ctrl
);
;
SCH_ERR_RET
(
code
);
return
code
;
// to avoid compiler error
}
source/libs/scheduler/src/schJob.c
浏览文件 @
f126e1e3
...
...
@@ -52,9 +52,8 @@ _return:
bool
schJobDone
(
SSchJob
*
pJob
)
{
int8_t
status
=
SCH_GET_JOB_STATUS
(
pJob
);
return
(
status
==
JOB_TASK_STATUS_FAIL
||
status
==
JOB_TASK_STATUS_DROP
||
status
==
JOB_TASK_STATUS_SUCC
);
return
(
status
==
JOB_TASK_STATUS_FAIL
||
status
==
JOB_TASK_STATUS_DROP
||
status
==
JOB_TASK_STATUS_SUCC
);
}
FORCE_INLINE
bool
schJobNeedToStop
(
SSchJob
*
pJob
,
int8_t
*
pStatus
)
{
...
...
@@ -221,7 +220,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
SCH_TASK_DLOG
(
"parents info, the %d parent TID 0x%"
PRIx64
,
n
,
(
*
parentTask
)
->
taskId
);
SCH_TASK_DLOG
(
"parents info, the %d parent TID 0x%"
PRIx64
,
n
,
(
*
parentTask
)
->
taskId
);
}
SCH_TASK_DLOG
(
"level:%d, parentNum:%d, childNum:%d"
,
i
,
parentNum
,
childNum
);
...
...
@@ -235,7 +234,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
SSchTask
*
pTask
=
taosArrayGet
(
pLevel
->
subTasks
,
0
);
SSchTask
*
pTask
=
taosArrayGet
(
pLevel
->
subTasks
,
0
);
if
(
SUBPLAN_TYPE_MODIFY
!=
pTask
->
plan
->
subplanType
)
{
pJob
->
attr
.
needFetch
=
true
;
}
...
...
@@ -244,7 +243,6 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
schAppendJobDataSrc
(
SSchJob
*
pJob
,
SSchTask
*
pTask
)
{
if
(
!
SCH_IS_DATA_BIND_QRY_TASK
(
pTask
))
{
return
TSDB_CODE_SUCCESS
;
...
...
@@ -255,7 +253,6 @@ int32_t schAppendJobDataSrc(SSchJob *pJob, SSchTask *pTask) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
schValidateAndBuildJob
(
SQueryPlan
*
pDag
,
SSchJob
*
pJob
)
{
int32_t
code
=
0
;
pJob
->
queryId
=
pDag
->
queryId
;
...
...
@@ -365,7 +362,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
SCH_ERR_JRET
(
schBuildTaskRalation
(
pJob
,
planToTask
));
_return:
if
(
planToTask
)
{
taosHashCleanup
(
planToTask
);
}
...
...
@@ -373,8 +370,7 @@ _return:
SCH_RET
(
code
);
}
int32_t
schDumpJobExecRes
(
SSchJob
*
pJob
,
SExecResult
*
pRes
)
{
int32_t
schDumpJobExecRes
(
SSchJob
*
pJob
,
SExecResult
*
pRes
)
{
pRes
->
code
=
atomic_load_32
(
&
pJob
->
errCode
);
pRes
->
numOfRows
=
pJob
->
resNumOfRows
;
pRes
->
res
=
pJob
->
execRes
.
res
;
...
...
@@ -387,13 +383,13 @@ int32_t schDumpJobExecRes(SSchJob* pJob, SExecResult* pRes) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
schDumpJobFetchRes
(
SSchJob
*
pJob
,
void
**
pData
)
{
int32_t
schDumpJobFetchRes
(
SSchJob
*
pJob
,
void
**
pData
)
{
int32_t
code
=
0
;
SCH_LOCK
(
SCH_WRITE
,
&
pJob
->
resLock
);
pJob
->
fetched
=
true
;
if
(
pJob
->
fetchRes
&&
((
SRetrieveTableRsp
*
)
pJob
->
fetchRes
)
->
completed
)
{
SCH_ERR_JRET
(
schSwitchJobStatus
(
pJob
,
JOB_TASK_STATUS_SUCC
,
NULL
));
}
...
...
@@ -422,12 +418,12 @@ int32_t schDumpJobFetchRes(SSchJob* pJob, void** pData) {
_return:
SCH_UNLOCK
(
SCH_WRITE
,
&
pJob
->
resLock
);
return
code
;
}
int32_t
schNotifyUserExecRes
(
SSchJob
*
pJob
)
{
SExecResult
*
pRes
=
taosMemoryCalloc
(
1
,
sizeof
(
SExecResult
));
int32_t
schNotifyUserExecRes
(
SSchJob
*
pJob
)
{
SExecResult
*
pRes
=
taosMemoryCalloc
(
1
,
sizeof
(
SExecResult
));
if
(
pRes
)
{
schDumpJobExecRes
(
pJob
,
pRes
);
}
...
...
@@ -439,9 +435,9 @@ int32_t schNotifyUserExecRes(SSchJob* pJob) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
schNotifyUserFetchRes
(
SSchJob
*
pJob
)
{
void
*
pRes
=
NULL
;
int32_t
schNotifyUserFetchRes
(
SSchJob
*
pJob
)
{
void
*
pRes
=
NULL
;
schDumpJobFetchRes
(
pJob
,
&
pRes
);
SCH_JOB_DLOG
(
"sch start to invoke fetch cb, code: %s"
,
tstrerror
(
pJob
->
errCode
));
...
...
@@ -453,17 +449,17 @@ int32_t schNotifyUserFetchRes(SSchJob* pJob) {
void
schPostJobRes
(
SSchJob
*
pJob
,
SCH_OP_TYPE
op
)
{
SCH_LOCK
(
SCH_WRITE
,
&
pJob
->
opStatus
.
lock
);
if
(
SCH_OP_NULL
==
pJob
->
opStatus
.
op
)
{
SCH_JOB_DLOG
(
"job not in any operation, no need to post job res, status:%s"
,
jobTaskStatusStr
(
pJob
->
status
));
goto
_return
;
}
if
(
op
&&
pJob
->
opStatus
.
op
!=
op
)
{
SCH_JOB_ELOG
(
"job in operation %s mis-match with expected %s"
,
schGetOpStr
(
pJob
->
opStatus
.
op
),
schGetOpStr
(
op
));
goto
_return
;
}
if
(
SCH_JOB_IN_SYNC_OP
(
pJob
))
{
SCH_UNLOCK
(
SCH_WRITE
,
&
pJob
->
opStatus
.
lock
);
tsem_post
(
&
pJob
->
rspSem
);
...
...
@@ -487,7 +483,7 @@ _return:
int32_t
schProcessOnJobFailure
(
SSchJob
*
pJob
,
int32_t
errCode
)
{
schUpdateJobErrCode
(
pJob
,
errCode
);
int32_t
code
=
atomic_load_32
(
&
pJob
->
errCode
);
if
(
code
)
{
SCH_JOB_DLOG
(
"job failed with error %s"
,
tstrerror
(
code
));
...
...
@@ -507,9 +503,7 @@ int32_t schHandleJobFailure(SSchJob *pJob, int32_t errCode) {
return
TSDB_CODE_SCH_IGNORE_ERROR
;
}
int32_t
schProcessOnJobDropped
(
SSchJob
*
pJob
,
int32_t
errCode
)
{
SCH_RET
(
schProcessOnJobFailure
(
pJob
,
errCode
));
}
int32_t
schProcessOnJobDropped
(
SSchJob
*
pJob
,
int32_t
errCode
)
{
SCH_RET
(
schProcessOnJobFailure
(
pJob
,
errCode
));
}
int32_t
schHandleJobDrop
(
SSchJob
*
pJob
,
int32_t
errCode
)
{
if
(
TSDB_CODE_SCH_IGNORE_ERROR
==
errCode
)
{
...
...
@@ -520,8 +514,7 @@ int32_t schHandleJobDrop(SSchJob *pJob, int32_t errCode) {
return
TSDB_CODE_SCH_IGNORE_ERROR
;
}
int32_t
schProcessOnJobPartialSuccess
(
SSchJob
*
pJob
)
{
int32_t
schProcessOnJobPartialSuccess
(
SSchJob
*
pJob
)
{
if
(
schChkCurrentOp
(
pJob
,
SCH_OP_FETCH
,
-
1
))
{
SCH_ERR_RET
(
schLaunchFetchTask
(
pJob
));
}
else
{
...
...
@@ -531,9 +524,7 @@ int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) {
return
TSDB_CODE_SUCCESS
;
}
void
schProcessOnDataFetched
(
SSchJob
*
pJob
)
{
schPostJobRes
(
pJob
,
SCH_OP_FETCH
);
}
void
schProcessOnDataFetched
(
SSchJob
*
pJob
)
{
schPostJobRes
(
pJob
,
SCH_OP_FETCH
);
}
int32_t
schProcessOnExplainDone
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SRetrieveTableRsp
*
pRsp
)
{
SCH_TASK_DLOG
(
"got explain rsp, rows:%d, complete:%d"
,
htonl
(
pRsp
->
numOfRows
),
pRsp
->
completed
);
...
...
@@ -548,14 +539,13 @@ int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRs
return
TSDB_CODE_SUCCESS
;
}
int32_t
schLaunchJobLowerLevel
(
SSchJob
*
pJob
,
SSchTask
*
pTask
)
{
if
(
!
SCH_IS_QUERY_JOB
(
pJob
))
{
return
TSDB_CODE_SUCCESS
;
}
SSchLevel
*
pLevel
=
pTask
->
level
;
int32_t
doneNum
=
atomic_add_fetch_32
(
&
pLevel
->
taskDoneNum
,
1
);
int32_t
doneNum
=
atomic_add_fetch_32
(
&
pLevel
->
taskDoneNum
,
1
);
if
(
doneNum
==
pLevel
->
taskNum
)
{
pJob
->
levelIdx
--
;
...
...
@@ -566,7 +556,7 @@ int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask) {
if
(
pTask
->
children
&&
taosArrayGetSize
(
pTask
->
children
)
>
0
)
{
continue
;
}
SCH_ERR_RET
(
schLaunchTask
(
pJob
,
pTask
));
}
}
...
...
@@ -577,11 +567,11 @@ int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask) {
int32_t
schSaveJobExecRes
(
SSchJob
*
pJob
,
SQueryTableRsp
*
rsp
)
{
if
(
rsp
->
tbFName
[
0
])
{
SCH_LOCK
(
SCH_WRITE
,
&
pJob
->
resLock
);
if
(
NULL
==
pJob
->
execRes
.
res
)
{
pJob
->
execRes
.
res
=
taosArrayInit
(
pJob
->
taskNum
,
sizeof
(
STbVerInfo
));
if
(
NULL
==
pJob
->
execRes
.
res
)
{
SCH_UNLOCK
(
SCH_WRITE
,
&
pJob
->
resLock
);
SCH_UNLOCK
(
SCH_WRITE
,
&
pJob
->
resLock
);
SCH_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
}
...
...
@@ -610,7 +600,6 @@ int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
schLaunchJob
(
SSchJob
*
pJob
)
{
if
(
EXPLAIN_MODE_STATIC
==
pJob
->
attr
.
explainMode
)
{
SCH_ERR_RET
(
qExecStaticExplain
(
pJob
->
pDag
,
(
SRetrieveTableRsp
**
)
&
pJob
->
fetchRes
));
...
...
@@ -623,11 +612,10 @@ int32_t schLaunchJob(SSchJob *pJob) {
return
TSDB_CODE_SUCCESS
;
}
void
schDropJobAllTasks
(
SSchJob
*
pJob
)
{
schDropTaskInHashList
(
pJob
,
pJob
->
execTasks
);
// schDropTaskInHashList(pJob, pJob->succTasks);
// schDropTaskInHashList(pJob, pJob->failTasks);
// schDropTaskInHashList(pJob, pJob->succTasks);
// schDropTaskInHashList(pJob, pJob->failTasks);
}
void
schFreeJobImpl
(
void
*
job
)
{
...
...
@@ -659,10 +647,10 @@ void schFreeJobImpl(void *job) {
schFreeFlowCtrl
(
pJob
);
taosHashCleanup
(
pJob
->
execTasks
);
// taosHashCleanup(pJob->failTasks);
// taosHashCleanup(pJob->succTasks);
// taosHashCleanup(pJob->failTasks);
// taosHashCleanup(pJob->succTasks);
taosHashCleanup
(
pJob
->
taskList
);
taosArrayDestroy
(
pJob
->
levels
);
taosArrayDestroy
(
pJob
->
nodeList
);
taosArrayDestroy
(
pJob
->
dataSrcTasks
);
...
...
@@ -688,19 +676,19 @@ void schFreeJobImpl(void *job) {
}
int32_t
schJobFetchRows
(
SSchJob
*
pJob
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
if
(
!
(
pJob
->
attr
.
explainMode
==
EXPLAIN_MODE_STATIC
))
{
SCH_ERR_RET
(
schLaunchFetchTask
(
pJob
));
if
(
schChkCurrentOp
(
pJob
,
SCH_OP_FETCH
,
true
))
{
SCH_JOB_DLOG
(
"sync wait for rsp now, job status:%s"
,
SCH_GET_JOB_STATUS_STR
(
pJob
));
tsem_wait
(
&
pJob
->
rspSem
);
SCH_RET
(
schDumpJobFetchRes
(
pJob
,
pJob
->
userRes
.
fetchRes
));
SCH_RET
(
schDumpJobFetchRes
(
pJob
,
pJob
->
userRes
.
fetchRes
));
}
}
else
{
if
(
schChkCurrentOp
(
pJob
,
SCH_OP_FETCH
,
true
))
{
SCH_RET
(
schDumpJobFetchRes
(
pJob
,
pJob
->
userRes
.
fetchRes
));
SCH_RET
(
schDumpJobFetchRes
(
pJob
,
pJob
->
userRes
.
fetchRes
));
}
else
{
schPostJobRes
(
pJob
,
SCH_OP_FETCH
);
}
...
...
@@ -736,9 +724,9 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) {
}
else
{
pJob
->
nodeList
=
taosArrayDup
(
pReq
->
pNodeList
);
}
pJob
->
taskList
=
taosHashInit
(
pReq
->
pDag
->
numOfSubplans
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_UBIGINT
),
false
,
HASH_ENTRY_LOCK
);
pJob
->
taskList
=
taosHashInit
(
pReq
->
pDag
->
numOfSubplans
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_UBIGINT
),
false
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
pJob
->
taskList
)
{
SCH_JOB_ELOG
(
"taosHashInit %d taskList failed"
,
pReq
->
pDag
->
numOfSubplans
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
...
...
@@ -750,8 +738,8 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) {
SCH_ERR_JRET
(
qExecExplainBegin
(
pReq
->
pDag
,
&
pJob
->
explainCtx
,
pReq
->
startTs
));
}
pJob
->
execTasks
=
taosHashInit
(
pReq
->
pDag
->
numOfSubplans
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_UBIGINT
),
false
,
HASH_ENTRY_LOCK
);
pJob
->
execTasks
=
taosHashInit
(
pReq
->
pDag
->
numOfSubplans
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_UBIGINT
),
false
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
pJob
->
execTasks
)
{
SCH_JOB_ELOG
(
"taosHashInit %d execTasks failed"
,
pReq
->
pDag
->
numOfSubplans
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
...
...
@@ -769,7 +757,7 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) {
*
pJobId
=
pJob
->
refId
;
SCH_JOB_DLOG
(
"job refId:0x%"
PRIx64
" created"
,
pJob
->
refId
);
SCH_JOB_DLOG
(
"job refId:0x%"
PRIx64
" created"
,
pJob
->
refId
);
return
TSDB_CODE_SUCCESS
;
...
...
@@ -782,31 +770,31 @@ _return:
}
else
{
taosRemoveRef
(
schMgmt
.
jobRef
,
pJob
->
refId
);
}
SCH_RET
(
code
);
}
int32_t
schExecJob
(
SSchJob
*
pJob
,
SSchedulerReq
*
pReq
)
{
int32_t
code
=
0
;
qDebug
(
"QID:0x%"
PRIx64
" sch job refId 0x%"
PRIx64
" started"
,
pReq
->
pDag
->
queryId
,
pJob
->
refId
);
qDebug
(
"QID:0x%"
PRIx64
" sch job refId 0x%"
PRIx64
" started"
,
pReq
->
pDag
->
queryId
,
pJob
->
refId
);
SCH_ERR_RET
(
schLaunchJob
(
pJob
));
if
(
pReq
->
syncReq
)
{
SCH_JOB_DLOG
(
"sync wait for rsp now, job status:%s"
,
SCH_GET_JOB_STATUS_STR
(
pJob
));
tsem_wait
(
&
pJob
->
rspSem
);
}
SCH_JOB_DLOG
(
"job exec done, job status:%s, jobId:0x%"
PRIx64
,
SCH_GET_JOB_STATUS_STR
(
pJob
),
pJob
->
refId
);
return
TSDB_CODE_SUCCESS
;
}
void
schDirectPostJobRes
(
SSchedulerReq
*
pReq
,
int32_t
errCode
)
{
void
schDirectPostJobRes
(
SSchedulerReq
*
pReq
,
int32_t
errCode
)
{
if
(
NULL
==
pReq
||
pReq
->
syncReq
)
{
return
;
}
if
(
pReq
->
execFp
)
{
(
*
pReq
->
execFp
)(
NULL
,
pReq
->
cbParam
,
errCode
);
}
else
if
(
pReq
->
fetchFp
)
{
...
...
@@ -827,16 +815,17 @@ bool schChkCurrentOp(SSchJob *pJob, int32_t op, int8_t sync) {
return
r
;
}
void
schProcessOnOpEnd
(
SSchJob
*
pJob
,
SCH_OP_TYPE
type
,
SSchedulerReq
*
pReq
,
int32_t
errCode
)
{
void
schProcessOnOpEnd
(
SSchJob
*
pJob
,
SCH_OP_TYPE
type
,
SSchedulerReq
*
pReq
,
int32_t
errCode
)
{
int32_t
op
=
0
;
switch
(
type
)
{
case
SCH_OP_EXEC
:
if
(
pReq
&&
pReq
->
syncReq
)
{
SCH_LOCK
(
SCH_WRITE
,
&
pJob
->
opStatus
.
lock
);
op
=
atomic_val_compare_exchange_32
(
&
pJob
->
opStatus
.
op
,
type
,
SCH_OP_NULL
);
if
(
SCH_OP_NULL
==
op
||
op
!=
type
)
{
SCH_JOB_ELOG
(
"job not in %s operation, op:%s, status:%s"
,
schGetOpStr
(
type
),
schGetOpStr
(
op
),
jobTaskStatusStr
(
pJob
->
status
));
SCH_JOB_ELOG
(
"job not in %s operation, op:%s, status:%s"
,
schGetOpStr
(
type
),
schGetOpStr
(
op
),
jobTaskStatusStr
(
pJob
->
status
));
}
SCH_UNLOCK
(
SCH_WRITE
,
&
pJob
->
opStatus
.
lock
);
schDumpJobExecRes
(
pJob
,
pReq
->
pExecRes
);
...
...
@@ -847,7 +836,8 @@ void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int
SCH_LOCK
(
SCH_WRITE
,
&
pJob
->
opStatus
.
lock
);
op
=
atomic_val_compare_exchange_32
(
&
pJob
->
opStatus
.
op
,
type
,
SCH_OP_NULL
);
if
(
SCH_OP_NULL
==
op
||
op
!=
type
)
{
SCH_JOB_ELOG
(
"job not in %s operation, op:%s, status:%s"
,
schGetOpStr
(
type
),
schGetOpStr
(
op
),
jobTaskStatusStr
(
pJob
->
status
));
SCH_JOB_ELOG
(
"job not in %s operation, op:%s, status:%s"
,
schGetOpStr
(
type
),
schGetOpStr
(
op
),
jobTaskStatusStr
(
pJob
->
status
));
}
SCH_UNLOCK
(
SCH_WRITE
,
&
pJob
->
opStatus
.
lock
);
}
...
...
@@ -866,10 +856,10 @@ void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int
SCH_JOB_DLOG
(
"job end %s operation with code %s"
,
schGetOpStr
(
type
),
tstrerror
(
errCode
));
}
int32_t
schProcessOnOpBegin
(
SSchJob
*
pJob
,
SCH_OP_TYPE
type
,
SSchedulerReq
*
pReq
)
{
int32_t
schProcessOnOpBegin
(
SSchJob
*
pJob
,
SCH_OP_TYPE
type
,
SSchedulerReq
*
pReq
)
{
int32_t
code
=
0
;
int8_t
status
=
SCH_GET_JOB_STATUS
(
pJob
);
int8_t
status
=
SCH_GET_JOB_STATUS
(
pJob
);
switch
(
type
)
{
case
SCH_OP_EXEC
:
SCH_LOCK
(
SCH_WRITE
,
&
pJob
->
opStatus
.
lock
);
...
...
@@ -879,9 +869,9 @@ int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq
schDirectPostJobRes
(
pReq
,
TSDB_CODE_TSC_APP_ERROR
);
SCH_ERR_RET
(
TSDB_CODE_TSC_APP_ERROR
);
}
SCH_JOB_DLOG
(
"job start %s operation"
,
schGetOpStr
(
pJob
->
opStatus
.
op
));
pJob
->
opStatus
.
syncReq
=
pReq
->
syncReq
;
SCH_UNLOCK
(
SCH_WRITE
,
&
pJob
->
opStatus
.
lock
);
break
;
...
...
@@ -893,16 +883,16 @@ int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq
schDirectPostJobRes
(
pReq
,
TSDB_CODE_TSC_APP_ERROR
);
SCH_ERR_RET
(
TSDB_CODE_TSC_APP_ERROR
);
}
SCH_JOB_DLOG
(
"job start %s operation"
,
schGetOpStr
(
pJob
->
opStatus
.
op
));
pJob
->
userRes
.
fetchRes
=
pReq
->
pFetchRes
;
pJob
->
userRes
.
fetchFp
=
pReq
->
fetchFp
;
pJob
->
userRes
.
cbParam
=
pReq
->
cbParam
;
pJob
->
opStatus
.
syncReq
=
pReq
->
syncReq
;
SCH_UNLOCK
(
SCH_WRITE
,
&
pJob
->
opStatus
.
lock
);
if
(
!
SCH_JOB_NEED_FETCH
(
pJob
))
{
SCH_JOB_ELOG
(
"no need to fetch data, status:%s"
,
SCH_GET_JOB_STATUS_STR
(
pJob
));
SCH_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
...
...
@@ -912,7 +902,7 @@ int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq
SCH_JOB_ELOG
(
"job status error for fetch, status:%s"
,
jobTaskStatusStr
(
status
));
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
break
;
case
SCH_OP_GET_STATUS
:
if
(
pJob
->
status
<
JOB_TASK_STATUS_INIT
||
pJob
->
levelNum
<=
0
||
NULL
==
pJob
->
levels
)
{
...
...
@@ -941,23 +931,23 @@ void schProcessOnCbEnd(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
if
(
errCode
)
{
schHandleJobFailure
(
pJob
,
errCode
);
}
if
(
pJob
)
{
schReleaseJob
(
pJob
->
refId
);
}
}
int32_t
schProcessOnCbBegin
(
SSchJob
**
job
,
SSchTask
**
task
,
uint64_t
qId
,
int64_t
rId
,
uint64_t
tId
)
{
int32_t
schProcessOnCbBegin
(
SSchJob
**
job
,
SSchTask
**
task
,
uint64_t
qId
,
int64_t
rId
,
uint64_t
tId
)
{
int32_t
code
=
0
;
int8_t
status
=
0
;
int8_t
status
=
0
;
SSchTask
*
pTask
=
NULL
;
SSchJob
*
pJob
=
schAcquireJob
(
rId
);
SSchJob
*
pJob
=
schAcquireJob
(
rId
);
if
(
NULL
==
pJob
)
{
qWarn
(
"QID:0x%"
PRIx64
",TID:0x%"
PRIx64
"job no exist, may be dropped, refId:0x%"
PRIx64
,
qId
,
tId
,
rId
);
SCH_ERR_RET
(
TSDB_CODE_QRY_JOB_NOT_EXIST
);
}
if
(
schJobNeedToStop
(
pJob
,
&
status
))
{
SCH_TASK_DLOG
(
"will not do further processing cause of job status %s"
,
jobTaskStatusStr
(
status
));
SCH_ERR_JRET
(
TSDB_CODE_SCH_IGNORE_ERROR
);
...
...
@@ -980,9 +970,6 @@ _return:
if
(
pJob
)
{
schReleaseJob
(
rId
);
}
SCH_RET
(
code
);
}
source/libs/scheduler/src/schStatus.c
浏览文件 @
f126e1e3
...
...
@@ -29,25 +29,25 @@ int32_t schSwitchJobStatus(SSchJob* pJob, int32_t status, void* param) {
case
JOB_TASK_STATUS_INIT
:
break
;
case
JOB_TASK_STATUS_EXEC
:
SCH_ERR_JRET
(
schExecJob
(
pJob
,
(
SSchedulerReq
*
)
param
));
SCH_ERR_JRET
(
schExecJob
(
pJob
,
(
SSchedulerReq
*
)
param
));
break
;
case
JOB_TASK_STATUS_PART_SUCC
:
SCH_ERR_JRET
(
schProcessOnJobPartialSuccess
(
pJob
));
break
;
case
JOB_TASK_STATUS_SUCC
:
break
;
case
JOB_TASK_STATUS_FAIL
:
case
JOB_TASK_STATUS_FAIL
:
SCH_RET
(
schProcessOnJobFailure
(
pJob
,
(
param
?
*
(
int32_t
*
)
param
:
0
)));
break
;
case
JOB_TASK_STATUS_DROP
:
schProcessOnJobDropped
(
pJob
,
*
(
int32_t
*
)
param
);
if
(
taosRemoveRef
(
schMgmt
.
jobRef
,
pJob
->
refId
))
{
SCH_JOB_ELOG
(
"remove job from job list failed, refId:0x%"
PRIx64
,
pJob
->
refId
);
}
else
{
SCH_JOB_DLOG
(
"job removed from jobRef list, refId:0x%"
PRIx64
,
pJob
->
refId
);
}
break
;
break
;
default:
{
SCH_JOB_ELOG
(
"unknown job status %d"
,
status
);
SCH_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
...
...
@@ -62,7 +62,7 @@ _return:
}
int32_t
schHandleOpBeginEvent
(
int64_t
jobId
,
SSchJob
**
job
,
SCH_OP_TYPE
type
,
SSchedulerReq
*
pReq
)
{
SSchJob
*
pJob
=
schAcquireJob
(
jobId
);
SSchJob
*
pJob
=
schAcquireJob
(
jobId
);
if
(
NULL
==
pJob
)
{
qWarn
(
"Acquire sch job failed, may be dropped, jobId:0x%"
PRIx64
,
jobId
);
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
...
...
@@ -75,12 +75,12 @@ int32_t schHandleOpBeginEvent(int64_t jobId, SSchJob** job, SCH_OP_TYPE type, SS
int32_t
schHandleOpEndEvent
(
SSchJob
*
pJob
,
SCH_OP_TYPE
type
,
SSchedulerReq
*
pReq
,
int32_t
errCode
)
{
int32_t
code
=
errCode
;
if
(
NULL
==
pJob
)
{
schDirectPostJobRes
(
pReq
,
errCode
);
SCH_RET
(
code
);
}
schProcessOnOpEnd
(
pJob
,
type
,
pReq
,
errCode
);
if
(
TSDB_CODE_SCH_IGNORE_ERROR
==
errCode
)
{
...
...
@@ -91,5 +91,3 @@ int32_t schHandleOpEndEvent(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq
return
code
;
}
source/libs/scheduler/src/schTask.c
浏览文件 @
f126e1e3
...
...
@@ -16,12 +16,12 @@
#include "catalog.h"
#include "command.h"
#include "query.h"
#include "qworker.h"
#include "schInt.h"
#include "tglobal.h"
#include "tmsg.h"
#include "tref.h"
#include "trpc.h"
#include "qworker.h"
#include "tglobal.h"
void
schFreeTask
(
SSchJob
*
pJob
,
SSchTask
*
pTask
)
{
schDeregisterTaskHb
(
pJob
,
pTask
);
...
...
@@ -94,7 +94,7 @@ int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) {
if
(
SCH_IS_LOCAL_EXEC_TASK
(
pJob
,
pTask
))
{
return
TSDB_CODE_SUCCESS
;
}
SQueryNodeAddr
*
addr
=
taosArrayGet
(
pTask
->
candidateAddrs
,
pTask
->
candidateIdx
);
if
(
NULL
==
addr
)
{
SCH_TASK_ELOG
(
"taosArrayGet candidate addr failed, idx:%d, size:%d"
,
pTask
->
candidateIdx
,
...
...
@@ -162,14 +162,15 @@ int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, v
if
(
dropExecNode
)
{
SCH_RET
(
schDropTaskExecNode
(
pJob
,
pTask
,
handle
,
execId
));
}
schUpdateTaskExecNode
(
pJob
,
pTask
,
handle
,
execId
);
if
((
execId
!=
pTask
->
execId
)
||
pTask
->
waitRetry
)
{
// ignore it
SCH_TASK_DLOG
(
"handle not updated since execId %d is already not current execId %d, waitRetry %d"
,
execId
,
pTask
->
execId
,
pTask
->
waitRetry
);
SCH_TASK_DLOG
(
"handle not updated since execId %d is already not current execId %d, waitRetry %d"
,
execId
,
pTask
->
execId
,
pTask
->
waitRetry
);
SCH_ERR_RET
(
TSDB_CODE_SCH_IGNORE_ERROR
);
}
SCH_SET_TASK_HANDLE
(
pTask
,
handle
);
return
TSDB_CODE_SUCCESS
;
...
...
@@ -837,17 +838,18 @@ int32_t schHandleExplainRes(SArray *pExplainRes) {
goto
_return
;
}
SSchTask
*
pTask
=
NULL
;
SSchJob
*
pJob
=
NULL
;
SSchTask
*
pTask
=
NULL
;
SSchJob
*
pJob
=
NULL
;
for
(
int32_t
i
=
0
;
i
<
resNum
;
++
i
)
{
SExplainLocalRsp
*
localRsp
=
taosArrayGet
(
pExplainRes
,
i
);
SExplainLocalRsp
*
localRsp
=
taosArrayGet
(
pExplainRes
,
i
);
qDebug
(
"QID:0x%"
PRIx64
",TID:0x%"
PRIx64
", begin to handle LOCAL explain rsp msg"
,
localRsp
->
qId
,
localRsp
->
tId
);
pJob
=
schAcquireJob
(
localRsp
->
rId
);
if
(
NULL
==
pJob
)
{
qWarn
(
"QID:0x%"
PRIx64
",TID:0x%"
PRIx64
"job no exist, may be dropped, refId:0x%"
PRIx64
,
localRsp
->
qId
,
localRsp
->
tId
,
localRsp
->
rId
);
qWarn
(
"QID:0x%"
PRIx64
",TID:0x%"
PRIx64
"job no exist, may be dropped, refId:0x%"
PRIx64
,
localRsp
->
qId
,
localRsp
->
tId
,
localRsp
->
rId
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_JOB_NOT_EXIST
);
}
...
...
@@ -857,16 +859,17 @@ int32_t schHandleExplainRes(SArray *pExplainRes) {
schReleaseJob
(
pJob
->
refId
);
SCH_ERR_JRET
(
TSDB_CODE_SCH_IGNORE_ERROR
);
}
code
=
schGetTaskInJob
(
pJob
,
localRsp
->
tId
,
&
pTask
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
schProcessExplainRsp
(
pJob
,
pTask
,
&
localRsp
->
rsp
);
}
schReleaseJob
(
pJob
->
refId
);
qDebug
(
"QID:0x%"
PRIx64
",TID:0x%"
PRIx64
", end to handle LOCAL explain rsp msg, code:%x"
,
localRsp
->
qId
,
localRsp
->
tId
,
code
);
qDebug
(
"QID:0x%"
PRIx64
",TID:0x%"
PRIx64
", end to handle LOCAL explain rsp msg, code:%x"
,
localRsp
->
qId
,
localRsp
->
tId
,
code
);
SCH_ERR_JRET
(
code
);
...
...
@@ -879,7 +882,7 @@ int32_t schHandleExplainRes(SArray *pExplainRes) {
_return:
for
(
int32_t
i
=
0
;
i
<
resNum
;
++
i
)
{
SExplainLocalRsp
*
localRsp
=
taosArrayGet
(
pExplainRes
,
i
);
SExplainLocalRsp
*
localRsp
=
taosArrayGet
(
pExplainRes
,
i
);
tFreeSExplainRsp
(
&
localRsp
->
rsp
);
}
...
...
@@ -890,7 +893,7 @@ _return:
int32_t
schLaunchRemoteTask
(
SSchJob
*
pJob
,
SSchTask
*
pTask
)
{
SSubplan
*
plan
=
pTask
->
plan
;
int32_t
code
=
0
;
int32_t
code
=
0
;
if
(
NULL
==
pTask
->
msg
)
{
// TODO add more detailed reason for failure
code
=
qSubPlanToMsg
(
plan
,
&
pTask
->
msg
,
&
pTask
->
msgLen
);
...
...
@@ -899,7 +902,7 @@ int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) {
pTask
->
msgLen
);
SCH_ERR_RET
(
code
);
}
else
if
(
tsQueryPlannerTrace
)
{
char
*
msg
=
NULL
;
char
*
msg
=
NULL
;
int32_t
msgLen
=
0
;
qSubPlanToString
(
plan
,
&
msg
,
&
msgLen
);
SCH_TASK_DLOGL
(
"physical plan len:%d, %s"
,
msgLen
,
msg
);
...
...
@@ -912,18 +915,18 @@ int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) {
if
(
SCH_IS_QUERY_JOB
(
pJob
))
{
SCH_ERR_RET
(
schEnsureHbConnection
(
pJob
,
pTask
));
}
SCH_RET
(
schBuildAndSendMsg
(
pJob
,
pTask
,
NULL
,
plan
->
msgType
));
}
int32_t
schLaunchLocalTask
(
SSchJob
*
pJob
,
SSchTask
*
pTask
)
{
//SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask));
//
SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask));
if
(
NULL
==
schMgmt
.
queryMgmt
)
{
SCH_ERR_RET
(
qWorkerInit
(
NODE_TYPE_CLIENT
,
CLIENT_HANDLE
,
(
void
**
)
&
schMgmt
.
queryMgmt
,
NULL
));
}
SArray
*
explainRes
=
NULL
;
SQWMsg
qwMsg
=
{
0
};
SQWMsg
qwMsg
=
{
0
};
qwMsg
.
msgInfo
.
taskType
=
TASK_TYPE_TEMP
;
qwMsg
.
msgInfo
.
explain
=
SCH_IS_EXPLAIN_JOB
(
pJob
);
qwMsg
.
msgInfo
.
needFetch
=
SCH_TASK_NEED_FETCH
(
pTask
);
...
...
@@ -934,8 +937,9 @@ int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) {
if
(
SCH_IS_EXPLAIN_JOB
(
pJob
))
{
explainRes
=
taosArrayInit
(
pJob
->
taskNum
,
sizeof
(
SExplainLocalRsp
));
}
SCH_ERR_RET
(
qWorkerProcessLocalQuery
(
schMgmt
.
queryMgmt
,
schMgmt
.
sId
,
pJob
->
queryId
,
pTask
->
taskId
,
pJob
->
refId
,
pTask
->
execId
,
&
qwMsg
,
explainRes
));
SCH_ERR_RET
(
qWorkerProcessLocalQuery
(
schMgmt
.
queryMgmt
,
schMgmt
.
sId
,
pJob
->
queryId
,
pTask
->
taskId
,
pJob
->
refId
,
pTask
->
execId
,
&
qwMsg
,
explainRes
));
if
(
SCH_IS_EXPLAIN_JOB
(
pJob
))
{
SCH_ERR_RET
(
schHandleExplainRes
(
explainRes
));
...
...
@@ -958,17 +962,17 @@ int32_t schLaunchTaskImpl(void *param) {
if
(
pCtx
->
asyncLaunch
)
{
SCH_LOCK_TASK
(
pTask
);
}
int8_t
status
=
0
;
int32_t
code
=
0
;
int8_t
status
=
0
;
int32_t
code
=
0
;
atomic_add_fetch_32
(
&
pTask
->
level
->
taskLaunchedNum
,
1
);
pTask
->
execId
++
;
pTask
->
retryTimes
++
;
pTask
->
waitRetry
=
false
;
SCH_TASK_DLOG
(
"start to launch %s task, execId %d, retry %d"
,
SCH_IS_LOCAL_EXEC_TASK
(
pJob
,
pTask
)
?
"LOCAL"
:
"REMOTE"
,
pTask
->
execId
,
pTask
->
retryTimes
);
SCH_TASK_DLOG
(
"start to launch %s task, execId %d, retry %d"
,
SCH_IS_LOCAL_EXEC_TASK
(
pJob
,
pTask
)
?
"LOCAL"
:
"REMOTE"
,
pTask
->
execId
,
pTask
->
retryTimes
);
SCH_LOG_TASK_START_TS
(
pTask
);
...
...
@@ -1086,19 +1090,20 @@ int32_t schExecRemoteFetch(SSchJob *pJob, SSchTask *pTask) {
}
int32_t
schExecLocalFetch
(
SSchJob
*
pJob
,
SSchTask
*
pTask
)
{
void
*
pRsp
=
NULL
;
void
*
pRsp
=
NULL
;
SArray
*
explainRes
=
NULL
;
if
(
SCH_IS_EXPLAIN_JOB
(
pJob
))
{
explainRes
=
taosArrayInit
(
pJob
->
taskNum
,
sizeof
(
SExplainLocalRsp
));
}
SCH_ERR_RET
(
qWorkerProcessLocalFetch
(
schMgmt
.
queryMgmt
,
schMgmt
.
sId
,
pJob
->
queryId
,
pTask
->
taskId
,
pJob
->
refId
,
pTask
->
execId
,
&
pRsp
,
explainRes
));
SCH_ERR_RET
(
qWorkerProcessLocalFetch
(
schMgmt
.
queryMgmt
,
schMgmt
.
sId
,
pJob
->
queryId
,
pTask
->
taskId
,
pJob
->
refId
,
pTask
->
execId
,
&
pRsp
,
explainRes
));
if
(
SCH_IS_EXPLAIN_JOB
(
pJob
))
{
SCH_ERR_RET
(
schHandleExplainRes
(
explainRes
));
}
SCH_ERR_RET
(
schProcessFetchRsp
(
pJob
,
pTask
,
pRsp
,
TSDB_CODE_SUCCESS
));
return
TSDB_CODE_SUCCESS
;
...
...
source/libs/scheduler/src/schUtil.c
浏览文件 @
f126e1e3
...
...
@@ -21,21 +21,21 @@
#include "tref.h"
#include "trpc.h"
FORCE_INLINE
SSchJob
*
schAcquireJob
(
int64_t
refId
)
{
qDebug
(
"sch acquire jobId:0x%"
PRIx64
,
refId
);
return
(
SSchJob
*
)
taosAcquireRef
(
schMgmt
.
jobRef
,
refId
);
FORCE_INLINE
SSchJob
*
schAcquireJob
(
int64_t
refId
)
{
qDebug
(
"sch acquire jobId:0x%"
PRIx64
,
refId
);
return
(
SSchJob
*
)
taosAcquireRef
(
schMgmt
.
jobRef
,
refId
);
}
FORCE_INLINE
int32_t
schReleaseJob
(
int64_t
refId
)
{
FORCE_INLINE
int32_t
schReleaseJob
(
int64_t
refId
)
{
if
(
0
==
refId
)
{
return
TSDB_CODE_SUCCESS
;
}
qDebug
(
"sch release jobId:0x%"
PRIx64
,
refId
);
return
taosReleaseRef
(
schMgmt
.
jobRef
,
refId
);
qDebug
(
"sch release jobId:0x%"
PRIx64
,
refId
);
return
taosReleaseRef
(
schMgmt
.
jobRef
,
refId
);
}
char
*
schGetOpStr
(
SCH_OP_TYPE
type
)
{
char
*
schGetOpStr
(
SCH_OP_TYPE
type
)
{
switch
(
type
)
{
case
SCH_OP_NULL
:
return
"NULL"
;
...
...
@@ -53,28 +53,28 @@ char* schGetOpStr(SCH_OP_TYPE type) {
void
schFreeHbTrans
(
SSchHbTrans
*
pTrans
)
{
rpcReleaseHandle
(
pTrans
->
trans
.
pHandle
,
TAOS_CONN_CLIENT
);
schFreeRpcCtx
(
&
pTrans
->
rpcCtx
);
schFreeRpcCtx
(
&
pTrans
->
rpcCtx
);
}
void
schCleanClusterHb
(
void
*
pTrans
)
{
void
schCleanClusterHb
(
void
*
pTrans
)
{
SCH_LOCK
(
SCH_WRITE
,
&
schMgmt
.
hbLock
);
SSchHbTrans
*
hb
=
taosHashIterate
(
schMgmt
.
hbConnections
,
NULL
);
while
(
hb
)
{
if
(
hb
->
trans
.
pTrans
==
pTrans
)
{
SQueryNodeEpId
*
pEpId
=
taosHashGetKey
(
hb
,
NULL
);
SQueryNodeEpId
*
pEpId
=
taosHashGetKey
(
hb
,
NULL
);
schFreeHbTrans
(
hb
);
taosHashRemove
(
schMgmt
.
hbConnections
,
pEpId
,
sizeof
(
SQueryNodeEpId
));
}
hb
=
taosHashIterate
(
schMgmt
.
hbConnections
,
hb
);
}
SCH_UNLOCK
(
SCH_WRITE
,
&
schMgmt
.
hbLock
);
}
int32_t
schRemoveHbConnection
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SQueryNodeEpId
*
epId
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
SCH_LOCK
(
SCH_WRITE
,
&
schMgmt
.
hbLock
);
SSchHbTrans
*
hb
=
taosHashGet
(
schMgmt
.
hbConnections
,
epId
,
sizeof
(
SQueryNodeEpId
));
...
...
@@ -94,7 +94,6 @@ int32_t schRemoveHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *ep
return
TSDB_CODE_SUCCESS
;
}
int32_t
schAddHbConnection
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SQueryNodeEpId
*
epId
,
bool
*
exist
)
{
int32_t
code
=
0
;
SSchHbTrans
hb
=
{
0
};
...
...
@@ -155,13 +154,13 @@ void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask) {
if
(
!
pTask
->
registerdHb
)
{
return
;
}
SQueryNodeAddr
*
addr
=
taosArrayGet
(
pTask
->
candidateAddrs
,
pTask
->
candidateIdx
);
SQueryNodeEpId
epId
=
{
0
};
epId
.
nodeId
=
addr
->
nodeId
;
SEp
*
pEp
=
SCH_GET_CUR_EP
(
addr
);
SEp
*
pEp
=
SCH_GET_CUR_EP
(
addr
);
strcpy
(
epId
.
ep
.
fqdn
,
pEp
->
fqdn
);
epId
.
ep
.
port
=
pEp
->
port
;
...
...
@@ -180,24 +179,22 @@ void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask) {
}
else
{
SCH_UNLOCK
(
SCH_READ
,
&
schMgmt
.
hbLock
);
}
pTask
->
registerdHb
=
false
;
}
int32_t
schEnsureHbConnection
(
SSchJob
*
pJob
,
SSchTask
*
pTask
)
{
SQueryNodeAddr
*
addr
=
taosArrayGet
(
pTask
->
candidateAddrs
,
pTask
->
candidateIdx
);
SQueryNodeEpId
epId
=
{
0
};
epId
.
nodeId
=
addr
->
nodeId
;
SEp
*
pEp
=
SCH_GET_CUR_EP
(
addr
);
SEp
*
pEp
=
SCH_GET_CUR_EP
(
addr
);
strcpy
(
epId
.
ep
.
fqdn
,
pEp
->
fqdn
);
epId
.
ep
.
port
=
pEp
->
port
;
SCH_ERR_RET
(
schRegisterHbConnection
(
pJob
,
pTask
,
&
epId
));
pTask
->
registerdHb
=
true
;
return
TSDB_CODE_SUCCESS
;
...
...
@@ -226,7 +223,6 @@ int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans) {
return
TSDB_CODE_SUCCESS
;
}
void
schCloseJobRef
(
void
)
{
if
(
!
atomic_load_8
((
int8_t
*
)
&
schMgmt
.
exit
))
{
return
;
...
...
@@ -242,7 +238,7 @@ uint64_t schGenTaskId(void) { return atomic_add_fetch_64(&schMgmt.taskId, 1); }
uint64_t
schGenUUID
(
void
)
{
static
uint64_t
hashId
=
0
;
static
int32_t
requestSerialId
=
0
;
static
int32_t
requestSerialId
=
0
;
if
(
hashId
==
0
)
{
char
uid
[
64
]
=
{
0
};
...
...
@@ -254,15 +250,14 @@ uint64_t schGenUUID(void) {
}
}
int64_t
ts
=
taosGetTimestampMs
();
uint64_t
pid
=
taosGetPId
();
int32_t
val
=
atomic_add_fetch_32
(
&
requestSerialId
,
1
);
int64_t
ts
=
taosGetTimestampMs
();
uint64_t
pid
=
taosGetPId
();
int32_t
val
=
atomic_add_fetch_32
(
&
requestSerialId
,
1
);
uint64_t
id
=
((
hashId
&
0x0FFF
)
<<
52
)
|
((
pid
&
0x0FFF
)
<<
40
)
|
((
ts
&
0xFFFFFF
)
<<
16
)
|
(
val
&
0xFFFF
);
return
id
;
}
void
schFreeRpcCtxVal
(
const
void
*
arg
)
{
if
(
NULL
==
arg
)
{
return
;
...
...
@@ -307,5 +302,3 @@ int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTas
return
TSDB_CODE_SUCCESS
;
}
source/libs/scheduler/src/scheduler.c
浏览文件 @
f126e1e3
...
...
@@ -14,10 +14,10 @@
*/
#include "query.h"
#include "qworker.h"
#include "schInt.h"
#include "tmsg.h"
#include "tref.h"
#include "qworker.h"
SSchedulerMgmt
schMgmt
=
{
.
jobRef
=
-
1
,
...
...
@@ -35,7 +35,7 @@ int32_t schedulerInit() {
schMgmt
.
cfg
.
enableReSchedule
=
true
;
qDebug
(
"schedule policy init to %d"
,
schMgmt
.
cfg
.
schPolicy
);
schMgmt
.
jobRef
=
taosOpenRef
(
schMgmt
.
cfg
.
maxJobNum
,
schFreeJobImpl
);
if
(
schMgmt
.
jobRef
<
0
)
{
qError
(
"init schduler jobRef failed, num:%u"
,
schMgmt
.
cfg
.
maxJobNum
);
...
...
@@ -61,7 +61,7 @@ int32_t schedulerInit() {
int32_t
schedulerExecJob
(
SSchedulerReq
*
pReq
,
int64_t
*
pJobId
)
{
qDebug
(
"scheduler %s exec job start"
,
pReq
->
syncReq
?
"SYNC"
:
"ASYNC"
);
int32_t
code
=
0
;
int32_t
code
=
0
;
SSchJob
*
pJob
=
NULL
;
SCH_ERR_JRET
(
schInitJob
(
pJobId
,
pReq
));
...
...
@@ -73,7 +73,7 @@ int32_t schedulerExecJob(SSchedulerReq *pReq, int64_t *pJobId) {
SCH_ERR_JRET
(
schSwitchJobStatus
(
pJob
,
JOB_TASK_STATUS_EXEC
,
pReq
));
_return:
SCH_RET
(
schHandleOpEndEvent
(
pJob
,
SCH_OP_EXEC
,
pReq
,
code
));
}
...
...
@@ -144,7 +144,7 @@ int32_t schedulerEnableReSchedule(bool enableResche) {
return
TSDB_CODE_SUCCESS
;
}
void
schedulerFreeJob
(
int64_t
*
jobId
,
int32_t
errCode
)
{
void
schedulerFreeJob
(
int64_t
*
jobId
,
int32_t
errCode
)
{
if
(
0
==
*
jobId
)
{
return
;
}
...
...
@@ -158,7 +158,7 @@ void schedulerFreeJob(int64_t* jobId, int32_t errCode) {
SCH_JOB_DLOG
(
"start to free job 0x%"
PRIx64
", errCode:0x%x"
,
*
jobId
,
errCode
);
schHandleJobDrop
(
pJob
,
errCode
);
schReleaseJob
(
*
jobId
);
*
jobId
=
0
;
}
...
...
source/libs/scheduler/test/schedulerTests.cpp
浏览文件 @
f126e1e3
此差异已折叠。
点击以展开。
tools/scripts/codeFormat.sh
100644 → 100755
浏览文件 @
f126e1e3
...
...
@@ -12,25 +12,7 @@ FORMAT_DIR_LIST=(
"
${
PRJ_ROOT_DIR
}
/source/os"
"
${
PRJ_ROOT_DIR
}
/source/util"
"
${
PRJ_ROOT_DIR
}
/source/common"
"
${
PRJ_ROOT_DIR
}
/source/libs/cache"
"
${
PRJ_ROOT_DIR
}
/source/libs/catalog"
"
${
PRJ_ROOT_DIR
}
/source/libs/command"
"
${
PRJ_ROOT_DIR
}
/source/libs/executor"
"
${
PRJ_ROOT_DIR
}
/source/libs/function"
"
${
PRJ_ROOT_DIR
}
/source/libs/index"
"
${
PRJ_ROOT_DIR
}
/source/libs/monitor"
"
${
PRJ_ROOT_DIR
}
/source/libs/nodes"
# "${PRJ_ROOT_DIR}/source/libs/parser"
"
${
PRJ_ROOT_DIR
}
/source/libs/planner"
"
${
PRJ_ROOT_DIR
}
/source/libs/qcom"
"
${
PRJ_ROOT_DIR
}
/source/libs/qworker"
"
${
PRJ_ROOT_DIR
}
/source/libs/scalar"
"
${
PRJ_ROOT_DIR
}
/source/libs/stream"
"
${
PRJ_ROOT_DIR
}
/source/libs/sync"
"
${
PRJ_ROOT_DIR
}
/source/libs/tdb"
"
${
PRJ_ROOT_DIR
}
/source/libs/tfs"
"
${
PRJ_ROOT_DIR
}
/source/libs/transport"
"
${
PRJ_ROOT_DIR
}
/source/libs/wal"
"
${
PRJ_ROOT_DIR
}
/source/libs"
"
${
PRJ_ROOT_DIR
}
/source/client/inc"
"
${
PRJ_ROOT_DIR
}
/source/client/src"
"
${
PRJ_ROOT_DIR
}
/source/client/test"
...
...
@@ -45,7 +27,7 @@ EXCLUDE_FILE_LIST=(
)
for
d
in
${
FORMAT_DIR_LIST
[@]
}
;
do
for
f
in
$(
find
$d
-type
f
-regex
'.*\.\(cpp\|hpp\|c\|h\)'
)
;
do
for
f
in
$(
find
$d
-type
f
-
not
-name
'*sql.c'
-
regex
'.*\.\(cpp\|hpp\|c\|h\)'
)
;
do
${
FORMAT_BIN
}
-i
$f
done
done
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录