Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
bf0f843b
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
bf0f843b
编写于
5月 23, 2022
作者:
D
dapan1121
提交者:
GitHub
5月 23, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #12883 from taosdata/feature/async.scheduler
feat: scheduler refactor
上级
a1b97b7a
679c9460
变更
6
展开全部
隐藏空白更改
内联
并排
Showing
6 changed file
with
2664 addition
and
2697 deletion
+2664
-2697
source/libs/scheduler/inc/schedulerInt.h
source/libs/scheduler/inc/schedulerInt.h
+27
-1
source/libs/scheduler/src/schFlowCtrl.c
source/libs/scheduler/src/schFlowCtrl.c
+1
-1
source/libs/scheduler/src/schJob.c
source/libs/scheduler/src/schJob.c
+1312
-0
source/libs/scheduler/src/schRemote.c
source/libs/scheduler/src/schRemote.c
+1231
-0
source/libs/scheduler/src/schUtil.c
source/libs/scheduler/src/schUtil.c
+92
-0
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+1
-2695
未找到文件。
source/libs/scheduler/inc/schedulerInt.h
浏览文件 @
bf0f843b
...
...
@@ -264,7 +264,7 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, i
SSchJob
*
schAcquireJob
(
int64_t
refId
);
int32_t
schReleaseJob
(
int64_t
refId
);
void
schFreeFlowCtrl
(
SSchJob
*
pJob
);
int32_t
schCh
ec
kJobNeedFlowCtrl
(
SSchJob
*
pJob
,
SSchLevel
*
pLevel
);
int32_t
schChkJobNeedFlowCtrl
(
SSchJob
*
pJob
,
SSchLevel
*
pLevel
);
int32_t
schDecTaskFlowQuota
(
SSchJob
*
pJob
,
SSchTask
*
pTask
);
int32_t
schCheckIncTaskFlowQuota
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
bool
*
enough
);
int32_t
schLaunchTasksInFlowCtrlList
(
SSchJob
*
pJob
,
SSchTask
*
pTask
);
...
...
@@ -275,6 +275,32 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId);
int32_t
schCloneSMsgSendInfo
(
void
*
src
,
void
**
dst
);
int32_t
schValidateAndBuildJob
(
SQueryPlan
*
pDag
,
SSchJob
*
pJob
);
void
schFreeJobImpl
(
void
*
job
);
int32_t
schMakeHbCallbackParam
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
void
**
pParam
);
int32_t
schMakeHbRpcCtx
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SRpcCtx
*
pCtx
);
int32_t
schEnsureHbConnection
(
SSchJob
*
pJob
,
SSchTask
*
pTask
);
int32_t
schUpdateHbConnection
(
SQueryNodeEpId
*
epId
,
SSchTrans
*
trans
);
int32_t
schHandleHbCallback
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
);
void
schFreeRpcCtx
(
SRpcCtx
*
pCtx
);
int32_t
schGetCallbackFp
(
int32_t
msgType
,
__async_send_cb_fn_t
*
fp
);
bool
schJobNeedToStop
(
SSchJob
*
pJob
,
int8_t
*
pStatus
);
int32_t
schProcessOnTaskSuccess
(
SSchJob
*
pJob
,
SSchTask
*
pTask
);
int32_t
schSaveJobQueryRes
(
SSchJob
*
pJob
,
SResReadyRsp
*
rsp
);
int32_t
schProcessOnExplainDone
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SRetrieveTableRsp
*
pRsp
);
void
schProcessOnDataFetched
(
SSchJob
*
job
);
int32_t
schGetTaskFromTaskList
(
SHashObj
*
pTaskList
,
uint64_t
taskId
,
SSchTask
**
pTask
);
int32_t
schUpdateTaskExecNodeHandle
(
SSchTask
*
pTask
,
void
*
handle
,
int32_t
rspCode
);
void
schFreeRpcCtxVal
(
const
void
*
arg
);
int32_t
schMakeBrokenLinkVal
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SRpcBrokenlinkVal
*
brokenVal
,
bool
isHb
);
int32_t
schRecordTaskExecNode
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SQueryNodeAddr
*
addr
,
void
*
handle
);
int32_t
schExecStaticExplain
(
void
*
transport
,
SArray
*
pNodeList
,
SQueryPlan
*
pDag
,
int64_t
*
job
,
const
char
*
sql
,
bool
syncSchedule
);
int32_t
schExecJobImpl
(
void
*
transport
,
SArray
*
pNodeList
,
SQueryPlan
*
pDag
,
int64_t
*
job
,
const
char
*
sql
,
int64_t
startTs
,
bool
sync
);
int32_t
schChkUpdateJobStatus
(
SSchJob
*
pJob
,
int8_t
newStatus
);
int32_t
schCancelJob
(
SSchJob
*
pJob
);
int32_t
schProcessOnJobDropped
(
SSchJob
*
pJob
,
int32_t
errCode
);
uint64_t
schGenTaskId
(
void
);
void
schCloseJobRef
(
void
);
#ifdef __cplusplus
...
...
source/libs/scheduler/src/schFlowCtrl.c
浏览文件 @
bf0f843b
...
...
@@ -40,7 +40,7 @@ void schFreeFlowCtrl(SSchJob *pJob) {
pJob
->
flowCtrl
=
NULL
;
}
int32_t
schCh
ec
kJobNeedFlowCtrl
(
SSchJob
*
pJob
,
SSchLevel
*
pLevel
)
{
int32_t
schChkJobNeedFlowCtrl
(
SSchJob
*
pJob
,
SSchLevel
*
pLevel
)
{
if
(
!
SCH_IS_QUERY_JOB
(
pJob
))
{
SCH_JOB_DLOG
(
"job no need flow ctrl, queryJob:%d"
,
SCH_IS_QUERY_JOB
(
pJob
));
return
TSDB_CODE_SUCCESS
;
...
...
source/libs/scheduler/src/schJob.c
0 → 100644
浏览文件 @
bf0f843b
此差异已折叠。
点击以展开。
source/libs/scheduler/src/schRemote.c
0 → 100644
浏览文件 @
bf0f843b
此差异已折叠。
点击以展开。
source/libs/scheduler/src/schUtil.c
0 → 100644
浏览文件 @
bf0f843b
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "catalog.h"
#include "command.h"
#include "query.h"
#include "schedulerInt.h"
#include "tmsg.h"
#include "tref.h"
#include "trpc.h"
void
schCloseJobRef
(
void
)
{
if
(
!
atomic_load_8
((
int8_t
*
)
&
schMgmt
.
exit
))
{
return
;
}
SCH_LOCK
(
SCH_WRITE
,
&
schMgmt
.
lock
);
if
(
atomic_load_32
(
&
schMgmt
.
jobNum
)
<=
0
&&
schMgmt
.
jobRef
>=
0
)
{
taosCloseRef
(
schMgmt
.
jobRef
);
schMgmt
.
jobRef
=
-
1
;
}
SCH_UNLOCK
(
SCH_WRITE
,
&
schMgmt
.
lock
);
}
uint64_t
schGenTaskId
(
void
)
{
return
atomic_add_fetch_64
(
&
schMgmt
.
taskId
,
1
);
}
uint64_t
schGenUUID
(
void
)
{
static
uint64_t
hashId
=
0
;
static
int32_t
requestSerialId
=
0
;
if
(
hashId
==
0
)
{
char
uid
[
64
];
int32_t
code
=
taosGetSystemUUID
(
uid
,
tListLen
(
uid
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"Failed to get the system uid, reason:%s"
,
tstrerror
(
TAOS_SYSTEM_ERROR
(
errno
)));
}
else
{
hashId
=
MurmurHash3_32
(
uid
,
strlen
(
uid
));
}
}
int64_t
ts
=
taosGetTimestampMs
();
uint64_t
pid
=
taosGetPId
();
int32_t
val
=
atomic_add_fetch_32
(
&
requestSerialId
,
1
);
uint64_t
id
=
((
hashId
&
0x0FFF
)
<<
52
)
|
((
pid
&
0x0FFF
)
<<
40
)
|
((
ts
&
0xFFFFFF
)
<<
16
)
|
(
val
&
0xFFFF
);
return
id
;
}
void
schFreeRpcCtxVal
(
const
void
*
arg
)
{
if
(
NULL
==
arg
)
{
return
;
}
SMsgSendInfo
*
pMsgSendInfo
=
(
SMsgSendInfo
*
)
arg
;
taosMemoryFreeClear
(
pMsgSendInfo
->
param
);
taosMemoryFreeClear
(
pMsgSendInfo
);
}
void
schFreeRpcCtx
(
SRpcCtx
*
pCtx
)
{
if
(
NULL
==
pCtx
)
{
return
;
}
void
*
pIter
=
taosHashIterate
(
pCtx
->
args
,
NULL
);
while
(
pIter
)
{
SRpcCtxVal
*
ctxVal
=
(
SRpcCtxVal
*
)
pIter
;
(
*
ctxVal
->
freeFunc
)(
ctxVal
->
val
);
pIter
=
taosHashIterate
(
pCtx
->
args
,
pIter
);
}
taosHashCleanup
(
pCtx
->
args
);
if
(
pCtx
->
brokenVal
.
freeFunc
)
{
(
*
pCtx
->
brokenVal
.
freeFunc
)(
pCtx
->
brokenVal
.
val
);
}
}
source/libs/scheduler/src/scheduler.c
浏览文件 @
bf0f843b
此差异已折叠。
点击以展开。
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录