Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
9627c67d
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
9627c67d
编写于
7月 12, 2022
作者:
M
Minghao Li
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor(sync): add vnode snapshot case
上级
7477103d
变更
3
显示空白变更内容
内联
并排
Showing
3 changed file
with
71 addition
and
76 deletion
+71
-76
source/libs/scheduler/src/schRemote.c
source/libs/scheduler/src/schRemote.c
+14
-11
source/libs/scheduler/src/schTask.c
source/libs/scheduler/src/schTask.c
+56
-64
tests/script/tsim/sync/vnodesnapshot-test.sim
tests/script/tsim/sync/vnodesnapshot-test.sim
+1
-1
未找到文件。
source/libs/scheduler/src/schRemote.c
浏览文件 @
9627c67d
...
...
@@ -375,7 +375,8 @@ int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
SSchTask
*
pTask
=
NULL
;
SSchJob
*
pJob
=
NULL
;
qDebug
(
"begin to handle rsp msg, type:%s, handle:%p, code:%s"
,
TMSG_INFO
(
pMsg
->
msgType
),
pMsg
->
handle
,
tstrerror
(
rspCode
));
qDebug
(
"begin to handle rsp msg, type:%s, handle:%p, code:%s"
,
TMSG_INFO
(
pMsg
->
msgType
),
pMsg
->
handle
,
tstrerror
(
rspCode
));
SCH_ERR_RET
(
schProcessOnCbBegin
(
&
pJob
,
&
pTask
,
pParam
->
queryId
,
pParam
->
refId
,
pParam
->
taskId
));
...
...
@@ -387,7 +388,8 @@ int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
taosMemoryFreeClear
(
pMsg
->
pData
);
taosMemoryFreeClear
(
param
);
qDebug
(
"end to handle rsp msg, type:%s, handle:%p, code:%s"
,
TMSG_INFO
(
pMsg
->
msgType
),
pMsg
->
handle
,
tstrerror
(
rspCode
));
qDebug
(
"end to handle rsp msg, type:%s, handle:%p, code:%s"
,
TMSG_INFO
(
pMsg
->
msgType
),
pMsg
->
handle
,
tstrerror
(
rspCode
));
SCH_RET
(
code
);
}
...
...
@@ -453,8 +455,8 @@ _return:
SCH_RET
(
code
);
}
int32_t
schMakeCallbackParam
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
int32_t
msgType
,
bool
isHb
,
SSchTrans
*
trans
,
void
**
pParam
)
{
int32_t
schMakeCallbackParam
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
int32_t
msgType
,
bool
isHb
,
SSchTrans
*
trans
,
void
**
pParam
)
{
if
(
!
isHb
)
{
SSchTaskCallbackParam
*
param
=
taosMemoryCalloc
(
1
,
sizeof
(
SSchTaskCallbackParam
));
if
(
NULL
==
param
)
{
...
...
@@ -940,7 +942,8 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
if
(
NULL
==
addr
)
{
addr
=
taosArrayGet
(
pTask
->
candidateAddrs
,
pTask
->
candidateIdx
);
isCandidateAddr
=
true
;
SCH_TASK_DLOG
(
"target candidateIdx %d"
,
pTask
->
candidateIdx
);
SCH_TASK_DLOG
(
"target candidateIdx %d, epInUse %d/%d"
,
pTask
->
candidateIdx
,
addr
->
epSet
.
inUse
,
addr
->
epSet
.
numOfEps
);
}
switch
(
msgType
)
{
...
...
source/libs/scheduler/src/schTask.c
浏览文件 @
9627c67d
...
...
@@ -21,8 +21,6 @@
#include "tref.h"
#include "trpc.h"
void
schFreeTask
(
SSchJob
*
pJob
,
SSchTask
*
pTask
)
{
schDeregisterTaskHb
(
pJob
,
pTask
);
...
...
@@ -45,7 +43,6 @@ void schFreeTask(SSchJob *pJob, SSchTask *pTask) {
}
}
int32_t
schInitTask
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SSubplan
*
pPlan
,
SSchLevel
*
pLevel
,
int32_t
levelNum
)
{
int32_t
code
=
0
;
...
...
@@ -55,7 +52,8 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *
pTask
->
maxExecTimes
=
SCH_TASK_MAX_EXEC_TIMES
(
pLevel
->
level
,
levelNum
);
pTask
->
timeoutUsec
=
SCH_DEFAULT_TASK_TIMEOUT_USEC
;
pTask
->
taskId
=
schGenTaskId
();
pTask
->
execNodes
=
taosHashInit
(
SCH_MAX_CANDIDATE_EP_NUM
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_NO_LOCK
);
pTask
->
execNodes
=
taosHashInit
(
SCH_MAX_CANDIDATE_EP_NUM
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_NO_LOCK
);
pTask
->
profile
.
execTime
=
taosMemoryCalloc
(
pTask
->
maxExecTimes
,
sizeof
(
int64_t
));
if
(
NULL
==
pTask
->
execNodes
||
NULL
==
pTask
->
profile
.
execTime
)
{
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
...
...
@@ -204,8 +202,6 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode)
SCH_RET
(
errCode
);
}
// Note: no more task error processing, handled in function internal
int32_t
schProcessOnTaskSuccess
(
SSchJob
*
pJob
,
SSchTask
*
pTask
)
{
bool
moved
=
false
;
...
...
@@ -265,7 +261,8 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
int32_t
readyNum
=
atomic_add_fetch_32
(
&
parent
->
childReady
,
1
);
SCH_LOCK_TASK
(
parent
);
SDownstreamSourceNode
source
=
{.
type
=
QUERY_NODE_DOWNSTREAM_SOURCE
,
SDownstreamSourceNode
source
=
{
.
type
=
QUERY_NODE_DOWNSTREAM_SOURCE
,
.
taskId
=
pTask
->
taskId
,
.
schedId
=
schMgmt
.
sId
,
.
execId
=
pTask
->
execId
,
...
...
@@ -291,8 +288,8 @@ int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) {
return
TSDB_CODE_SUCCESS
;
}
if
(
SCH_TASK_TIMEOUT
(
pTask
)
&&
JOB_TASK_STATUS_EXEC
==
pTask
->
status
&&
pJob
->
fetchTask
!=
pTask
&&
taosArrayGetSize
(
pTask
->
candidateAddrs
)
>
1
)
{
if
(
SCH_TASK_TIMEOUT
(
pTask
)
&&
JOB_TASK_STATUS_EXEC
==
pTask
->
status
&&
pJob
->
fetchTask
!=
pTask
&&
taosArrayGetSize
(
pTask
->
candidateAddrs
)
>
1
)
{
SCH_TASK_DLOG
(
"task execId %d will be rescheduled now"
,
pTask
->
execId
);
schDropTaskOnExecNode
(
pJob
,
pTask
);
taosHashClear
(
pTask
->
execNodes
);
...
...
@@ -303,12 +300,12 @@ int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
schDoTaskRedirect
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SDataBuf
*
pData
,
int32_t
rspCode
)
{
int32_t
schDoTaskRedirect
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SDataBuf
*
pData
,
int32_t
rspCode
)
{
int32_t
code
=
0
;
if
((
pTask
->
execId
+
1
)
>=
pTask
->
maxExecTimes
)
{
SCH_TASK_DLOG
(
"task no more retry since reach max try times, execId:%d"
,
pTask
->
execId
);
schSwitchJobStatus
(
pJob
,
JOB_TASK_STATUS_FAIL
,
(
void
*
)
&
rspCode
);
schSwitchJobStatus
(
pJob
,
JOB_TASK_STATUS_FAIL
,
(
void
*
)
&
rspCode
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -342,7 +339,6 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32
return
TSDB_CODE_SUCCESS
;
}
// merge plan
pTask
->
childReady
=
0
;
...
...
@@ -358,7 +354,7 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32
int32_t
childrenNum
=
taosArrayGetSize
(
pTask
->
children
);
for
(
int32_t
i
=
0
;
i
<
childrenNum
;
++
i
)
{
SSchTask
*
pChild
=
taosArrayGetP
(
pTask
->
children
,
i
);
SSchTask
*
pChild
=
taosArrayGetP
(
pTask
->
children
,
i
);
SCH_LOCK_TASK
(
pChild
);
schDoTaskRedirect
(
pJob
,
pChild
,
NULL
,
rspCode
);
SCH_UNLOCK_TASK
(
pChild
);
...
...
@@ -371,7 +367,7 @@ _return:
SCH_RET
(
schProcessOnTaskFailure
(
pJob
,
pTask
,
code
));
}
int32_t
schHandleRedirect
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SDataBuf
*
pData
,
int32_t
rspCode
)
{
int32_t
schHandleRedirect
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SDataBuf
*
pData
,
int32_t
rspCode
)
{
int32_t
code
=
0
;
if
(
SCH_IS_DATA_BIND_TASK
(
pTask
))
{
...
...
@@ -545,7 +541,8 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
schDeregisterTaskHb
(
pJob
,
pTask
);
if
(
SCH_IS_DATA_BIND_TASK
(
pTask
))
{
SCH_SWITCH_EPSET
(
&
pTask
->
plan
->
execNode
);
SQueryNodeAddr
*
addr
=
taosArrayGet
(
pTask
->
candidateAddrs
,
pTask
->
candidateIdx
);
SCH_SWITCH_EPSET
(
addr
);
}
else
{
SCH_ERR_RET
(
schSwitchTaskCandidateAddr
(
pJob
,
pTask
));
}
...
...
@@ -571,7 +568,8 @@ int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) {
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
SCH_TASK_DLOG
(
"set %dth candidate addr, id %d, fqdn:%s, port:%d"
,
i
,
naddr
->
nodeId
,
SCH_GET_CUR_EP
(
naddr
)
->
fqdn
,
SCH_GET_CUR_EP
(
naddr
)
->
port
);
SCH_TASK_DLOG
(
"set %dth candidate addr, id %d, fqdn:%s, port:%d"
,
i
,
naddr
->
nodeId
,
SCH_GET_CUR_EP
(
naddr
)
->
fqdn
,
SCH_GET_CUR_EP
(
naddr
)
->
port
);
++
addNum
;
}
...
...
@@ -585,7 +583,6 @@ int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
schSetTaskCandidateAddrs
(
SSchJob
*
pJob
,
SSchTask
*
pTask
)
{
if
(
NULL
!=
pTask
->
candidateAddrs
)
{
return
TSDB_CODE_SUCCESS
;
...
...
@@ -628,16 +625,17 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
schUpdateTaskCandidateAddr
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SEpSet
*
pEpSet
)
{
int32_t
schUpdateTaskCandidateAddr
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SEpSet
*
pEpSet
)
{
if
(
NULL
==
pTask
->
candidateAddrs
||
1
!=
taosArrayGetSize
(
pTask
->
candidateAddrs
))
{
SCH_TASK_ELOG
(
"not able to update cndidate addr, addr num %d"
,
(
int32_t
)(
pTask
->
candidateAddrs
?
taosArrayGetSize
(
pTask
->
candidateAddrs
)
:
0
));
SCH_TASK_ELOG
(
"not able to update cndidate addr, addr num %d"
,
(
int32_t
)(
pTask
->
candidateAddrs
?
taosArrayGetSize
(
pTask
->
candidateAddrs
)
:
0
));
SCH_ERR_RET
(
TSDB_CODE_APP_ERROR
);
}
SQueryNodeAddr
*
pAddr
=
taosArrayGet
(
pTask
->
candidateAddrs
,
0
);
SQueryNodeAddr
*
pAddr
=
taosArrayGet
(
pTask
->
candidateAddrs
,
0
);
SEp
*
pOld
=
&
pAddr
->
epSet
.
eps
[
pAddr
->
epSet
.
inUse
];
SEp
*
pNew
=
&
pEpSet
->
eps
[
pEpSet
->
inUse
];
SEp
*
pOld
=
&
pAddr
->
epSet
.
eps
[
pAddr
->
epSet
.
inUse
];
SEp
*
pNew
=
&
pEpSet
->
eps
[
pEpSet
->
inUse
];
SCH_TASK_DLOG
(
"update task ep from %s:%d to %s:%d"
,
pOld
->
fqdn
,
pOld
->
port
,
pNew
->
fqdn
,
pNew
->
port
);
...
...
@@ -655,8 +653,6 @@ int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
schRemoveTaskFromExecList
(
SSchJob
*
pJob
,
SSchTask
*
pTask
)
{
int32_t
code
=
taosHashRemove
(
pJob
->
execTasks
,
&
pTask
->
taskId
,
sizeof
(
pTask
->
taskId
));
if
(
code
)
{
...
...
@@ -692,28 +688,27 @@ void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) {
SCH_TASK_DLOG
(
"task has been dropped on %d exec nodes"
,
size
);
}
int32_t
schProcessOnTaskStatusRsp
(
SQueryNodeEpId
*
pEpId
,
SArray
*
pStatusList
)
{
int32_t
schProcessOnTaskStatusRsp
(
SQueryNodeEpId
*
pEpId
,
SArray
*
pStatusList
)
{
int32_t
taskNum
=
(
int32_t
)
taosArrayGetSize
(
pStatusList
);
SSchTask
*
pTask
=
NULL
;
SSchJob
*
pJob
=
NULL
;
qDebug
(
"%d task status in hb rsp from nodeId:%d, fqdn:%s, port:%d"
,
taskNum
,
pEpId
->
nodeId
,
pEpId
->
ep
.
fqdn
,
pEpId
->
ep
.
port
);
qDebug
(
"%d task status in hb rsp from nodeId:%d, fqdn:%s, port:%d"
,
taskNum
,
pEpId
->
nodeId
,
pEpId
->
ep
.
fqdn
,
pEpId
->
ep
.
port
);
for
(
int32_t
i
=
0
;
i
<
taskNum
;
++
i
)
{
STaskStatus
*
pStatus
=
taosArrayGet
(
pStatusList
,
i
);
int32_t
code
=
0
;
qDebug
(
"QID:0x%"
PRIx64
",TID:0x%"
PRIx64
",EID:%d task status in server: %s"
,
pStatus
->
queryId
,
pStatus
->
taskId
,
pStatus
->
execId
,
jobTaskStatusStr
(
pStatus
->
status
));
qDebug
(
"QID:0x%"
PRIx64
",TID:0x%"
PRIx64
",EID:%d task status in server: %s"
,
pStatus
->
queryId
,
pStatus
->
taskId
,
pStatus
->
execId
,
jobTaskStatusStr
(
pStatus
->
status
));
if
(
schProcessOnCbBegin
(
&
pJob
,
&
pTask
,
pStatus
->
queryId
,
pStatus
->
refId
,
pStatus
->
taskId
))
{
continue
;
}
if
(
pStatus
->
execId
!=
pTask
->
execId
)
{
//TODO
//
TODO
SCH_TASK_DLOG
(
"execId %d mis-match current execId %d"
,
pStatus
->
execId
,
pTask
->
execId
);
schProcessOnCbEnd
(
pJob
,
pTask
,
0
);
continue
;
...
...
@@ -832,7 +827,6 @@ void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) {
}
}
// Note: no more error processing, handled in function internal
int32_t
schLaunchFetchTask
(
SSchJob
*
pJob
)
{
int32_t
code
=
0
;
...
...
@@ -851,5 +845,3 @@ _return:
SCH_RET
(
schProcessOnTaskFailure
(
pJob
,
pJob
->
fetchTask
,
code
));
}
tests/script/tsim/sync/vnodesnapshot-test.sim
浏览文件 @
9627c67d
...
...
@@ -201,7 +201,7 @@ system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode3 -s start
system sh/exec.sh -n dnode4 -s start
sleep
3
000
sleep
7
000
print =============== query data
sql connect
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录