Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
195ca4ab
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
195ca4ab
编写于
3月 22, 2022
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feature/scheduler
上级
76b206fe
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
365 addition
and
57 deletion
+365
-57
include/common/tmsgdef.h
include/common/tmsgdef.h
+4
-0
source/libs/scheduler/inc/schedulerInt.h
source/libs/scheduler/inc/schedulerInt.h
+20
-6
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+334
-44
source/libs/scheduler/test/schedulerTests.cpp
source/libs/scheduler/test/schedulerTests.cpp
+7
-7
未找到文件。
include/common/tmsgdef.h
浏览文件 @
195ca4ab
...
...
@@ -204,6 +204,10 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_SND_TASK_DEPLOY
,
"snode-task-deploy"
,
SStreamTaskDeployReq
,
SStreamTaskDeployRsp
)
TD_DEF_MSG_TYPE
(
TDMT_SND_TASK_EXEC
,
"snode-task-exec"
,
SStreamTaskExecReq
,
SStreamTaskExecRsp
)
// Requests handled by SCHEDULER
TD_NEW_MSG_SEG
(
TDMT_SCH_MSG
)
TD_DEF_MSG_TYPE
(
TDMT_SCH_LINK_BROKEN
,
"scheduler-link-broken"
,
NULL
,
NULL
)
#if defined(TD_MSG_NUMBER_)
TDMT_MAX
#endif
...
...
source/libs/scheduler/inc/schedulerInt.h
浏览文件 @
195ca4ab
...
...
@@ -25,6 +25,7 @@ extern "C" {
#include "planner.h"
#include "scheduler.h"
#include "thash.h"
#include "trpc.h"
#define SCHEDULE_DEFAULT_MAX_JOB_NUM 1000
#define SCHEDULE_DEFAULT_MAX_TASK_NUM 1000
...
...
@@ -44,6 +45,7 @@ typedef struct SSchTrans {
typedef
struct
SSchHbTrans
{
SRWLatch
lock
;
SRpcCtx
rpcCtx
;
SSchTrans
trans
;
}
SSchHbTrans
;
...
...
@@ -75,12 +77,23 @@ typedef struct SSchedulerMgmt {
SHashObj
*
hbConnections
;
}
SSchedulerMgmt
;
typedef
struct
SSchCallbackParam
{
uint64_t
queryId
;
int64_t
refId
;
uint64_t
taskId
;
void
*
transport
;
}
SSchCallbackParam
;
typedef
struct
SSchCallbackParamHeader
{
bool
isHbParam
;
}
SSchCallbackParamHeader
;
typedef
struct
SSchTaskCallbackParam
{
SSchCallbackParamHeader
head
;
uint64_t
queryId
;
int64_t
refId
;
uint64_t
taskId
;
void
*
transport
;
}
SSchTaskCallbackParam
;
typedef
struct
SSchHbCallbackParam
{
SSchCallbackParamHeader
head
;
SQueryNodeEpId
nodeEpId
;
void
*
transport
;
}
SSchHbCallbackParam
;
typedef
struct
SSchFlowControl
{
SRWLatch
lock
;
...
...
@@ -227,6 +240,7 @@ int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask);
int32_t
schLaunchTaskImpl
(
SSchJob
*
pJob
,
SSchTask
*
pTask
);
int32_t
schFetchFromRemote
(
SSchJob
*
pJob
);
int32_t
schProcessOnTaskFailure
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
int32_t
errCode
);
int32_t
schBuildAndSendHbMsg
(
SQueryNodeEpId
*
nodeEpId
);
#ifdef __cplusplus
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
195ca4ab
...
...
@@ -74,12 +74,16 @@ void schFreeRpcCtx(SRpcCtx *pCtx) {
while
(
pIter
)
{
SRpcCtxVal
*
ctxVal
=
(
SRpcCtxVal
*
)
pIter
;
ctxVal
->
free
(
ctxVal
->
val
);
(
*
ctxVal
->
free
)
(
ctxVal
->
val
);
pIter
=
taosHashIterate
(
pCtx
->
args
,
pIter
);
}
taosHashCleanup
(
pCtx
->
args
);
if
(
pCtx
->
brokenVal
.
free
)
{
(
*
pCtx
->
brokenVal
.
free
)(
pCtx
->
brokenVal
.
val
);
}
}
void
schFreeTask
(
SSchTask
*
pTask
)
{
...
...
@@ -118,6 +122,8 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m
int32_t
taskStatus
=
SCH_GET_TASK_STATUS
(
pTask
);
int32_t
reqMsgType
=
msgType
-
1
;
switch
(
msgType
)
{
case
TDMT_SCH_LINK_BROKEN
:
return
TSDB_CODE_SUCCESS
;
case
TDMT_VND_QUERY_RSP
:
// query_rsp may be processed later than ready_rsp
if
(
lastMsgType
!=
reqMsgType
)
{
SCH_TASK_DLOG
(
"rsp msg type mis-match, last sent msgType:%s, rspType:%s"
,
TMSG_INFO
(
lastMsgType
),
TMSG_INFO
(
msgType
));
...
...
@@ -659,26 +665,7 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
schRegisterHbConnection
(
SQueryNodeEpId
*
epId
,
bool
*
exist
)
{
int32_t
code
=
0
;
SSchHbTrans
hb
=
{
0
};
code
=
taosHashPut
(
schMgmt
.
hbConnections
,
epId
,
sizeof
(
SQueryNodeEpId
),
&
hb
,
sizeof
(
SSchHbTrans
));
if
(
code
)
{
if
(
HASH_NODE_EXIST
(
code
))
{
*
exist
=
true
;
return
TSDB_CODE_SUCCESS
;
}
qError
(
"taosHashPut hb trans failed, nodeId:%d, fqdn:%s, port:%d"
,
epId
->
nodeId
,
epId
->
ep
.
fqdn
,
epId
->
ep
.
port
);
SCH_ERR_RET
(
code
);
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
schUpdateHbConnection
(
SQueryNodeEpId
*
epId
,
SSchHbTrans
*
trans
)
{
int32_t
schUpdateHbConnection
(
SQueryNodeEpId
*
epId
,
SSchTrans
*
trans
)
{
int32_t
code
=
0
;
SSchHbTrans
*
hb
=
NULL
;
...
...
@@ -689,13 +676,13 @@ int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchHbTrans *trans) {
}
SCH_LOCK
(
SCH_WRITE
,
&
hb
->
lock
);
memcpy
(
&
hb
->
trans
,
&
trans
->
trans
,
sizeof
(
trans
->
trans
));
memcpy
(
&
hb
->
trans
,
trans
,
sizeof
(
*
trans
));
SCH_UNLOCK
(
SCH_WRITE
,
&
hb
->
lock
);
qDebug
(
"hb connection updated, sId:%"
PRIx64
", nodeId:%d, fqdn:%s, port:%d, instance:%p,
connection
:%p"
,
schMgmt
.
sId
,
epId
->
nodeId
,
epId
->
ep
.
fqdn
,
epId
->
ep
.
port
,
trans
->
trans
.
trans
Inst
,
trans
->
trans
.
trans
Handle
);
", nodeId:%d, fqdn:%s, port:%d, instance:%p,
handle
:%p"
,
schMgmt
.
sId
,
epId
->
nodeId
,
epId
->
ep
.
fqdn
,
epId
->
ep
.
port
,
trans
->
transInst
,
trans
->
transHandle
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1058,6 +1045,10 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
SCH_ERR_JRET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
break
;
}
case
TDMT_SCH_LINK_BROKEN
:
SCH_TASK_ELOG
(
"link broken received, error:%x - %s"
,
rspCode
,
tstrerror
(
rspCode
));
SCH_ERR_JRET
(
rspCode
);
break
;
default:
SCH_TASK_ELOG
(
"unknown rsp msg, type:%d, status:%s"
,
msgType
,
SCH_GET_TASK_STATUS_STR
(
pTask
));
SCH_ERR_JRET
(
TSDB_CODE_QRY_INVALID_INPUT
);
...
...
@@ -1072,7 +1063,7 @@ _return:
int32_t
schHandleCallback
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
msgType
,
int32_t
rspCode
)
{
int32_t
code
=
0
;
SSch
CallbackParam
*
pParam
=
(
SSch
CallbackParam
*
)
param
;
SSch
TaskCallbackParam
*
pParam
=
(
SSchTask
CallbackParam
*
)
param
;
SSchTask
*
pTask
=
NULL
;
SSchJob
*
pJob
=
schAcquireJob
(
pParam
->
refId
);
...
...
@@ -1131,7 +1122,7 @@ int32_t schHandleReadyCallback(void *param, const SDataBuf *pMsg, int32_t code)
}
int32_t
schHandleDropCallback
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
SSch
CallbackParam
*
pParam
=
(
SSch
CallbackParam
*
)
param
;
SSch
TaskCallbackParam
*
pParam
=
(
SSchTask
CallbackParam
*
)
param
;
qDebug
(
"QID:%"
PRIx64
",TID:%"
PRIx64
" drop task rsp received, code:%x"
,
pParam
->
queryId
,
pParam
->
taskId
,
code
);
}
...
...
@@ -1143,16 +1134,16 @@ int32_t schHandleHbCallback(void *param, const SDataBuf *pMsg, int32_t code) {
SSchedulerHbRsp
rsp
=
{
0
};
SSch
CallbackParam
*
pParam
=
(
SSch
CallbackParam
*
)
param
;
SSch
HbCallbackParam
*
pParam
=
(
SSchHb
CallbackParam
*
)
param
;
if
(
tDeserializeSSchedulerHbRsp
(
pMsg
->
pData
,
pMsg
->
len
,
&
rsp
))
{
qError
(
"invalid hb rsp msg, size:%d"
,
pMsg
->
len
);
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
SSch
Hb
Trans
trans
=
{
0
};
trans
.
trans
.
trans
Inst
=
pParam
->
transport
;
trans
.
trans
.
trans
Handle
=
pMsg
->
handle
;
SSchTrans
trans
=
{
0
};
trans
.
transInst
=
pParam
->
transport
;
trans
.
transHandle
=
pMsg
->
handle
;
SCH_RET
(
schUpdateHbConnection
(
&
rsp
.
epId
,
&
trans
));
...
...
@@ -1180,6 +1171,24 @@ _return:
SCH_RET
(
code
);
}
int32_t
schHandleLinkBrokenCallback
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
SSchCallbackParamHeader
*
head
=
(
SSchCallbackParamHeader
*
)
param
;
rpcReleaseHandle
(
pMsg
->
handle
,
TAOS_CONN_CLIENT
);
if
(
head
->
isHbParam
)
{
SSchHbCallbackParam
*
hbParam
=
(
SSchHbCallbackParam
*
)
param
;
SSchTrans
trans
=
{.
transInst
=
hbParam
->
transport
,
.
transHandle
=
NULL
};
SCH_ERR_RET
(
schUpdateHbConnection
(
&
hbParam
->
nodeEpId
,
&
trans
));
SCH_ERR_RET
(
schBuildAndSendHbMsg
(
&
hbParam
->
nodeEpId
));
}
else
{
SCH_ERR_RET
(
schHandleCallback
(
param
,
pMsg
,
TDMT_SCH_LINK_BROKEN
,
code
));
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
schGetCallbackFp
(
int32_t
msgType
,
__async_send_cb_fn_t
*
fp
)
{
switch
(
msgType
)
{
case
TDMT_VND_CREATE_TABLE
:
...
...
@@ -1203,6 +1212,9 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
case
TDMT_VND_QUERY_HEARTBEAT
:
*
fp
=
schHandleHbCallback
;
break
;
case
TDMT_SCH_LINK_BROKEN
:
*
fp
=
schHandleLinkBrokenCallback
;
break
;
default:
qError
(
"unknown msg type for callback, msgType:%d"
,
msgType
);
SCH_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
...
...
@@ -1221,9 +1233,84 @@ void schFreeRpcCtxVal(void *arg) {
tfree
(
pMsgSendInfo
);
}
int32_t
schMakeTaskCallbackParam
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
void
**
pParam
)
{
SSchTaskCallbackParam
*
param
=
calloc
(
1
,
sizeof
(
SSchTaskCallbackParam
));
if
(
NULL
==
param
)
{
SCH_TASK_ELOG
(
"calloc %d failed"
,
(
int32_t
)
sizeof
(
SSchTaskCallbackParam
));
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
param
->
queryId
=
pJob
->
queryId
;
param
->
refId
=
pJob
->
refId
;
param
->
taskId
=
SCH_TASK_ID
(
pTask
);
param
->
transport
=
pJob
->
transport
;
*
pParam
=
param
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
schMakeHbCallbackParam
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
void
**
pParam
)
{
SSchHbCallbackParam
*
param
=
calloc
(
1
,
sizeof
(
SSchHbCallbackParam
));
if
(
NULL
==
param
)
{
SCH_TASK_ELOG
(
"calloc %d failed"
,
(
int32_t
)
sizeof
(
SSchHbCallbackParam
));
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
param
->
head
.
isHbParam
=
true
;
SQueryNodeAddr
*
addr
=
taosArrayGet
(
pTask
->
candidateAddrs
,
pTask
->
candidateIdx
);
param
->
nodeEpId
.
nodeId
=
addr
->
nodeId
;
memcpy
(
&
param
->
nodeEpId
.
ep
,
SCH_GET_CUR_EP
(
addr
),
sizeof
(
SEp
));
param
->
transport
=
pJob
->
transport
;
*
pParam
=
param
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
schMakeBrokenLinkVal
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SRpcBrokenlinkVal
*
brokenVal
,
bool
isHb
)
{
int32_t
code
=
0
;
SMsgSendInfo
*
pMsgSendInfo
=
NULL
;
pMsgSendInfo
=
calloc
(
1
,
sizeof
(
SMsgSendInfo
));
if
(
NULL
==
pMsgSendInfo
)
{
SCH_TASK_ELOG
(
"calloc %d failed"
,
(
int32_t
)
sizeof
(
SMsgSendInfo
));
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
if
(
isHb
)
{
SCH_ERR_JRET
(
schMakeHbCallbackParam
(
pJob
,
pTask
,
&
pMsgSendInfo
->
param
));
}
else
{
SCH_ERR_JRET
(
schMakeTaskCallbackParam
(
pJob
,
pTask
,
&
pMsgSendInfo
->
param
));
}
int32_t
msgType
=
TDMT_SCH_LINK_BROKEN
;
__async_send_cb_fn_t
fp
=
NULL
;
SCH_ERR_JRET
(
schGetCallbackFp
(
msgType
,
&
fp
));
pMsgSendInfo
->
fp
=
fp
;
brokenVal
->
msgType
=
msgType
;
brokenVal
->
val
=
pMsgSendInfo
;
brokenVal
->
len
=
sizeof
(
SMsgSendInfo
);
brokenVal
->
free
=
schFreeRpcCtxVal
;
return
TSDB_CODE_SUCCESS
;
_return:
tfree
(
pMsgSendInfo
->
param
);
tfree
(
pMsgSendInfo
);
SCH_RET
(
code
);
}
int32_t
schMakeQueryRpcCtx
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SRpcCtx
*
pCtx
)
{
int32_t
code
=
0
;
SSchCallbackParam
*
param
=
NULL
;
SSch
Task
CallbackParam
*
param
=
NULL
;
SMsgSendInfo
*
pMsgSendInfo
=
NULL
;
pCtx
->
args
=
taosHashInit
(
1
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
false
,
HASH_ENTRY_LOCK
);
...
...
@@ -1238,9 +1325,9 @@ int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
param
=
calloc
(
1
,
sizeof
(
SSchCallbackParam
));
param
=
calloc
(
1
,
sizeof
(
SSch
Task
CallbackParam
));
if
(
NULL
==
param
)
{
SCH_TASK_ELOG
(
"calloc %d failed"
,
(
int32_t
)
sizeof
(
SSchCallbackParam
));
SCH_TASK_ELOG
(
"calloc %d failed"
,
(
int32_t
)
sizeof
(
SSch
Task
CallbackParam
));
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
...
...
@@ -1262,6 +1349,8 @@ int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
SCH_ERR_JRET
(
schMakeBrokenLinkVal
(
pJob
,
pTask
,
&
pCtx
->
brokenVal
,
false
));
return
TSDB_CODE_SUCCESS
;
_return:
...
...
@@ -1275,8 +1364,13 @@ _return:
int32_t
schMakeHbRpcCtx
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SRpcCtx
*
pCtx
)
{
int32_t
code
=
0
;
SSchCallbackParam
*
param
=
NULL
;
SSch
Hb
CallbackParam
*
param
=
NULL
;
SMsgSendInfo
*
pMsgSendInfo
=
NULL
;
SQueryNodeAddr
*
addr
=
taosArrayGet
(
pTask
->
candidateAddrs
,
pTask
->
candidateIdx
);
SQueryNodeEpId
epId
=
{
0
};
epId
.
nodeId
=
addr
->
nodeId
;
memcpy
(
&
epId
.
ep
,
SCH_GET_CUR_EP
(
addr
),
sizeof
(
SEp
));
pCtx
->
args
=
taosHashInit
(
1
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
false
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
pCtx
->
args
)
{
...
...
@@ -1290,9 +1384,9 @@ int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
param
=
calloc
(
1
,
sizeof
(
SSchCallbackParam
));
param
=
calloc
(
1
,
sizeof
(
SSch
Hb
CallbackParam
));
if
(
NULL
==
param
)
{
SCH_TASK_ELOG
(
"calloc %d failed"
,
(
int32_t
)
sizeof
(
SSchCallbackParam
));
SCH_TASK_ELOG
(
"calloc %d failed"
,
(
int32_t
)
sizeof
(
SSch
Hb
CallbackParam
));
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
...
...
@@ -1300,9 +1394,7 @@ int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
__async_send_cb_fn_t
fp
=
NULL
;
SCH_ERR_JRET
(
schGetCallbackFp
(
TDMT_VND_QUERY_HEARTBEAT
,
&
fp
));
param
->
queryId
=
pJob
->
queryId
;
param
->
refId
=
pJob
->
refId
;
param
->
taskId
=
SCH_TASK_ID
(
pTask
);
param
->
nodeEpId
=
epId
;
param
->
transport
=
pJob
->
transport
;
pMsgSendInfo
->
param
=
param
;
...
...
@@ -1314,6 +1406,8 @@ int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
SCH_ERR_JRET
(
schMakeBrokenLinkVal
(
pJob
,
pTask
,
&
pCtx
->
brokenVal
,
true
));
return
TSDB_CODE_SUCCESS
;
_return:
...
...
@@ -1326,6 +1420,118 @@ _return:
}
int32_t
schRegisterHbConnection
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SQueryNodeEpId
*
epId
,
bool
*
exist
)
{
int32_t
code
=
0
;
SSchHbTrans
hb
=
{
0
};
hb
.
trans
.
transInst
=
pJob
->
transport
;
SCH_ERR_RET
(
schMakeHbRpcCtx
(
pJob
,
pTask
,
&
hb
.
rpcCtx
));
code
=
taosHashPut
(
schMgmt
.
hbConnections
,
epId
,
sizeof
(
SQueryNodeEpId
),
&
hb
,
sizeof
(
SSchHbTrans
));
if
(
code
)
{
schFreeRpcCtx
(
&
hb
.
rpcCtx
);
if
(
HASH_NODE_EXIST
(
code
))
{
*
exist
=
true
;
return
TSDB_CODE_SUCCESS
;
}
qError
(
"taosHashPut hb trans failed, nodeId:%d, fqdn:%s, port:%d"
,
epId
->
nodeId
,
epId
->
ep
.
fqdn
,
epId
->
ep
.
port
);
SCH_ERR_RET
(
code
);
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
schCloneCallbackParam
(
SSchCallbackParamHeader
*
pSrc
,
SSchCallbackParamHeader
**
pDst
)
{
if
(
pSrc
->
isHbParam
)
{
SSchHbCallbackParam
*
dst
=
malloc
(
sizeof
(
SSchHbCallbackParam
));
if
(
NULL
==
dst
)
{
qError
(
"malloc SSchHbCallbackParam failed"
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
memcpy
(
dst
,
pSrc
,
sizeof
(
*
dst
));
*
pDst
=
(
SSchCallbackParamHeader
*
)
dst
;
return
TSDB_CODE_SUCCESS
;
}
SSchTaskCallbackParam
*
dst
=
malloc
(
sizeof
(
SSchTaskCallbackParam
));
if
(
NULL
==
dst
)
{
qError
(
"malloc SSchTaskCallbackParam failed"
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
memcpy
(
dst
,
pSrc
,
sizeof
(
*
dst
));
*
pDst
=
(
SSchCallbackParamHeader
*
)
dst
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
schCloneSMsgSendInfo
(
SMsgSendInfo
*
pSrc
,
SMsgSendInfo
**
pDst
)
{
int32_t
code
=
0
;
SMsgSendInfo
*
dst
=
malloc
(
sizeof
(
*
pSrc
));
if
(
NULL
==
dst
)
{
qError
(
"malloc SMsgSendInfo for rpcCtx failed, len:%d"
,
(
int32_t
)
sizeof
(
*
pSrc
));
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
memcpy
(
dst
,
pSrc
,
sizeof
(
*
pSrc
));
dst
->
param
=
NULL
;
SCH_ERR_JRET
(
schCloneCallbackParam
(
pSrc
->
param
,
(
SSchCallbackParamHeader
**
)
&
dst
->
param
));
return
TSDB_CODE_SUCCESS
;
_return:
tfree
(
dst
);
SCH_RET
(
code
);
}
int32_t
schCloneHbRpcCtx
(
SRpcCtx
*
pSrc
,
SRpcCtx
*
pDst
)
{
int32_t
code
=
0
;
memcpy
(
&
pDst
->
brokenVal
,
&
pSrc
->
brokenVal
,
sizeof
(
pSrc
->
brokenVal
));
pDst
->
brokenVal
.
val
=
NULL
;
SCH_ERR_RET
(
schCloneSMsgSendInfo
(
pSrc
->
brokenVal
.
val
,
(
SMsgSendInfo
**
)
&
pDst
->
brokenVal
.
val
));
pDst
->
args
=
taosHashInit
(
1
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
false
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
pDst
->
args
)
{
qError
(
"taosHashInit %d RpcCtx failed"
,
1
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
SRpcCtxVal
dst
=
{
0
};
void
*
pIter
=
taosHashIterate
(
pSrc
->
args
,
NULL
);
while
(
pIter
)
{
SRpcCtxVal
*
pVal
=
(
SRpcCtxVal
*
)
pIter
;
int32_t
*
msgType
=
taosHashGetKey
(
pIter
,
NULL
);
SCH_ERR_JRET
(
schCloneSMsgSendInfo
(
pVal
->
val
,
(
SMsgSendInfo
**
)
&
dst
.
val
));
if
(
taosHashPut
(
pDst
->
args
,
msgType
,
sizeof
(
*
msgType
),
pVal
,
sizeof
(
*
pVal
)))
{
qError
(
"taosHashPut msg %d to rpcCtx failed"
,
*
msgType
);
(
*
dst
.
free
)(
dst
.
val
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
pIter
=
taosHashIterate
(
pSrc
->
args
,
pIter
);
}
return
TSDB_CODE_SUCCESS
;
_return:
schFreeRpcCtx
(
pDst
);
SCH_RET
(
code
);
}
int32_t
schAsyncSendMsg
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
void
*
transport
,
SEpSet
*
epSet
,
int32_t
msgType
,
void
*
msg
,
uint32_t
msgSize
,
bool
persistHandle
,
SRpcCtx
*
ctx
)
{
int32_t
code
=
0
;
...
...
@@ -1337,9 +1543,9 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, void *transport, SEpSet*
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
SSch
CallbackParam
*
param
=
calloc
(
1
,
sizeof
(
SSch
CallbackParam
));
SSch
TaskCallbackParam
*
param
=
calloc
(
1
,
sizeof
(
SSchTask
CallbackParam
));
if
(
NULL
==
param
)
{
SCH_TASK_ELOG
(
"calloc %d failed"
,
(
int32_t
)
sizeof
(
SSchCallbackParam
));
SCH_TASK_ELOG
(
"calloc %d failed"
,
(
int32_t
)
sizeof
(
SSch
Task
CallbackParam
));
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
...
...
@@ -1374,6 +1580,90 @@ _return:
SCH_RET
(
code
);
}
int32_t
schBuildAndSendHbMsg
(
SQueryNodeEpId
*
nodeEpId
)
{
SSchedulerHbReq
req
=
{
0
};
int32_t
code
=
0
;
SRpcCtx
rpcCtx
=
{
0
};
SSchTrans
trans
=
{
0
};
int32_t
msgType
=
TDMT_VND_QUERY_HEARTBEAT
;
req
.
sId
=
schMgmt
.
sId
;
req
.
header
.
vgId
=
nodeEpId
->
nodeId
;
memcpy
(
&
req
.
epId
,
nodeEpId
,
sizeof
(
SQueryNodeEpId
));
SSchHbTrans
*
hb
=
taosHashGet
(
schMgmt
.
hbConnections
,
nodeEpId
,
sizeof
(
SQueryNodeEpId
));
if
(
NULL
==
hb
)
{
qError
(
"taosHashGet hb connection failed, nodeId:%d, fqdn:%s, port:%d"
,
nodeEpId
->
nodeId
,
nodeEpId
->
ep
.
fqdn
,
nodeEpId
->
ep
.
port
);
SCH_ERR_RET
(
code
);
}
SCH_LOCK
(
SCH_WRITE
,
&
hb
->
lock
);
code
=
schCloneHbRpcCtx
(
&
hb
->
rpcCtx
,
&
rpcCtx
);
memcpy
(
&
trans
,
&
hb
->
trans
,
sizeof
(
trans
));
SCH_UNLOCK
(
SCH_WRITE
,
&
hb
->
lock
);
SCH_ERR_RET
(
code
);
int32_t
msgSize
=
tSerializeSSchedulerHbReq
(
NULL
,
0
,
&
req
);
if
(
msgSize
<
0
)
{
qError
(
"tSerializeSSchedulerHbReq hbReq failed, size:%d"
,
msgSize
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
void
*
msg
=
calloc
(
1
,
msgSize
);
if
(
NULL
==
msg
)
{
qError
(
"calloc hb req %d failed"
,
msgSize
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
if
(
tSerializeSSchedulerHbReq
(
msg
,
msgSize
,
&
req
)
<
0
)
{
qError
(
"tSerializeSSchedulerHbReq hbReq failed, size:%d"
,
msgSize
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
SMsgSendInfo
*
pMsgSendInfo
=
calloc
(
1
,
sizeof
(
SMsgSendInfo
));
if
(
NULL
==
pMsgSendInfo
)
{
qError
(
"calloc SMsgSendInfo failed"
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
SSchTaskCallbackParam
*
param
=
calloc
(
1
,
sizeof
(
SSchTaskCallbackParam
));
if
(
NULL
==
param
)
{
qError
(
"calloc SSchTaskCallbackParam failed"
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
__async_send_cb_fn_t
fp
=
NULL
;
SCH_ERR_JRET
(
schGetCallbackFp
(
msgType
,
&
fp
));
param
->
transport
=
trans
.
transInst
;
pMsgSendInfo
->
param
=
param
;
pMsgSendInfo
->
msgInfo
.
pData
=
msg
;
pMsgSendInfo
->
msgInfo
.
len
=
msgSize
;
pMsgSendInfo
->
msgInfo
.
handle
=
trans
.
transHandle
;
pMsgSendInfo
->
msgType
=
msgType
;
pMsgSendInfo
->
fp
=
fp
;
int64_t
transporterId
=
0
;
SEpSet
epSet
=
{.
inUse
=
0
,
.
numOfEps
=
1
};
memcpy
(
&
epSet
.
eps
[
0
],
&
nodeEpId
->
ep
,
sizeof
(
nodeEpId
->
ep
));
code
=
asyncSendMsgToServerExt
(
trans
.
transInst
,
&
epSet
,
&
transporterId
,
pMsgSendInfo
,
true
,
&
rpcCtx
);
if
(
code
)
{
SCH_ERR_JRET
(
code
);
}
qDebug
(
"req msg sent, type:%d, %s"
,
msgType
,
TMSG_INFO
(
msgType
));
return
TSDB_CODE_SUCCESS
;
_return:
tfree
(
msg
);
tfree
(
param
);
tfree
(
pMsgSendInfo
);
schFreeRpcCtx
(
&
rpcCtx
);
SCH_RET
(
code
);
}
int32_t
schBuildAndSendMsg
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SQueryNodeAddr
*
addr
,
int32_t
msgType
)
{
uint32_t
msgSize
=
0
;
void
*
msg
=
NULL
;
...
...
@@ -1546,9 +1836,9 @@ int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) {
SSchHbTrans
*
hb
=
taosHashGet
(
schMgmt
.
hbConnections
,
&
epId
,
sizeof
(
SQueryNodeEpId
));
if
(
NULL
==
hb
)
{
bool
exist
=
false
;
SCH_ERR_RET
(
schRegisterHbConnection
(
&
epId
,
&
exist
));
SCH_ERR_RET
(
schRegisterHbConnection
(
pJob
,
pTask
,
&
epId
,
&
exist
));
if
(
!
exist
)
{
SCH_ERR_RET
(
schBuildAndSend
Msg
(
pJob
,
NULL
,
addr
,
TDMT_VND_QUERY_HEARTBEAT
));
SCH_ERR_RET
(
schBuildAndSend
HbMsg
(
&
epId
));
}
}
...
...
source/libs/scheduler/test/schedulerTests.cpp
浏览文件 @
195ca4ab
...
...
@@ -387,7 +387,7 @@ void *schtCreateFetchRspThread(void *param) {
void
*
schtFetchRspThread
(
void
*
aa
)
{
SDataBuf
dataBuf
=
{
0
};
SSchCallbackParam
*
param
=
NULL
;
SSch
Task
CallbackParam
*
param
=
NULL
;
while
(
!
schtTestStop
)
{
if
(
0
==
atomic_val_compare_exchange_32
(
&
schtStartFetch
,
1
,
0
))
{
...
...
@@ -396,7 +396,7 @@ void *schtFetchRspThread(void *aa) {
taosUsleep
(
1
);
param
=
(
SSchCallbackParam
*
)
calloc
(
1
,
sizeof
(
*
param
));
param
=
(
SSch
Task
CallbackParam
*
)
calloc
(
1
,
sizeof
(
*
param
));
param
->
queryId
=
schtQueryId
;
param
->
taskId
=
schtFetchTaskId
;
...
...
@@ -449,7 +449,7 @@ void* schtRunJobThread(void *aa) {
schtSetAsyncSendMsgToServer
();
SSchJob
*
pJob
=
NULL
;
SSchCallbackParam
*
param
=
NULL
;
SSch
Task
CallbackParam
*
param
=
NULL
;
SHashObj
*
execTasks
=
NULL
;
SDataBuf
dataBuf
=
{
0
};
uint32_t
jobFinished
=
0
;
...
...
@@ -484,7 +484,7 @@ void* schtRunJobThread(void *aa) {
pIter
=
taosHashIterate
(
pJob
->
execTasks
,
pIter
);
}
param
=
(
SSchCallbackParam
*
)
calloc
(
1
,
sizeof
(
*
param
));
param
=
(
SSch
Task
CallbackParam
*
)
calloc
(
1
,
sizeof
(
*
param
));
param
->
refId
=
queryJobRefId
;
param
->
queryId
=
pJob
->
queryId
;
...
...
@@ -504,7 +504,7 @@ void* schtRunJobThread(void *aa) {
}
param
=
(
SSchCallbackParam
*
)
calloc
(
1
,
sizeof
(
*
param
));
param
=
(
SSch
Task
CallbackParam
*
)
calloc
(
1
,
sizeof
(
*
param
));
param
->
refId
=
queryJobRefId
;
param
->
queryId
=
pJob
->
queryId
;
...
...
@@ -524,7 +524,7 @@ void* schtRunJobThread(void *aa) {
}
param
=
(
SSchCallbackParam
*
)
calloc
(
1
,
sizeof
(
*
param
));
param
=
(
SSch
Task
CallbackParam
*
)
calloc
(
1
,
sizeof
(
*
param
));
param
->
refId
=
queryJobRefId
;
param
->
queryId
=
pJob
->
queryId
;
...
...
@@ -544,7 +544,7 @@ void* schtRunJobThread(void *aa) {
}
param
=
(
SSchCallbackParam
*
)
calloc
(
1
,
sizeof
(
*
param
));
param
=
(
SSch
Task
CallbackParam
*
)
calloc
(
1
,
sizeof
(
*
param
));
param
->
refId
=
queryJobRefId
;
param
->
queryId
=
pJob
->
queryId
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录