Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
eda3b192
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
eda3b192
编写于
4月 24, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix compile
上级
f4728e82
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
138 addition
and
131 deletion
+138
-131
source/libs/qworker/src/qworkerMsg.c
source/libs/qworker/src/qworkerMsg.c
+138
-131
未找到文件。
source/libs/qworker/src/qworkerMsg.c
浏览文件 @
eda3b192
#include "qworker
Msg
.h"
#include "qworker.h"
#include "
dataSinkMgt
.h"
#include "
tcommon
.h"
#include "executor.h"
#include "executor.h"
#include "planner.h"
#include "planner.h"
#include "query.h"
#include "query.h"
#include "qworker.h"
#include "qworkerInt.h"
#include "qworkerInt.h"
#include "
tcommon
.h"
#include "
qworkerMsg
.h"
#include "tmsg.h"
#include "tmsg.h"
#include "tname.h"
#include "tname.h"
#include "dataSinkMgt.h"
int32_t
qwMallocFetchRsp
(
int32_t
length
,
SRetrieveTableRsp
**
rsp
)
{
int32_t
qwMallocFetchRsp
(
int32_t
length
,
SRetrieveTableRsp
**
rsp
)
{
int32_t
msgSize
=
sizeof
(
SRetrieveTableRsp
)
+
length
;
int32_t
msgSize
=
sizeof
(
SRetrieveTableRsp
)
+
length
;
SRetrieveTableRsp
*
pRsp
=
(
SRetrieveTableRsp
*
)
rpcMallocCont
(
msgSize
);
SRetrieveTableRsp
*
pRsp
=
(
SRetrieveTableRsp
*
)
rpcMallocCont
(
msgSize
);
if
(
NULL
==
pRsp
)
{
if
(
NULL
==
pRsp
)
{
qError
(
"rpcMallocCont %d failed"
,
msgSize
);
qError
(
"rpcMallocCont %d failed"
,
msgSize
);
...
@@ -25,9 +26,11 @@ int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp) {
...
@@ -25,9 +26,11 @@ int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp) {
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
void
qwBuildFetchRsp
(
void
*
msg
,
SOutputData
*
input
,
int32_t
len
,
bool
qComplete
)
{
void
qwBuildFetchRsp
(
void
*
msg
,
SOutputData
*
input
,
int32_t
len
,
bool
qComplete
)
{
SRetrieveTableRsp
*
rsp
=
(
SRetrieveTableRsp
*
)
msg
;
SRetrieveTableRsp
*
rsp
=
(
SRetrieveTableRsp
*
)
msg
;
rsp
->
useconds
=
htobe64
(
input
->
useconds
);
rsp
->
useconds
=
htobe64
(
input
->
useconds
);
rsp
->
completed
=
qComplete
;
rsp
->
completed
=
qComplete
;
rsp
->
precision
=
input
->
precision
;
rsp
->
precision
=
input
->
precision
;
...
@@ -36,6 +39,7 @@ void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete)
...
@@ -36,6 +39,7 @@ void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete)
rsp
->
numOfRows
=
htonl
(
input
->
numOfRows
);
rsp
->
numOfRows
=
htonl
(
input
->
numOfRows
);
}
}
void
qwFreeFetchRsp
(
void
*
msg
)
{
void
qwFreeFetchRsp
(
void
*
msg
)
{
if
(
msg
)
{
if
(
msg
)
{
rpcFreeCont
(
msg
);
rpcFreeCont
(
msg
);
...
@@ -44,18 +48,18 @@ void qwFreeFetchRsp(void *msg) {
...
@@ -44,18 +48,18 @@ void qwFreeFetchRsp(void *msg) {
int32_t
qwBuildAndSendQueryRsp
(
SQWConnInfo
*
pConn
,
int32_t
code
)
{
int32_t
qwBuildAndSendQueryRsp
(
SQWConnInfo
*
pConn
,
int32_t
code
)
{
SQueryTableRsp
rsp
=
{.
code
=
code
};
SQueryTableRsp
rsp
=
{.
code
=
code
};
int32_t
contLen
=
tSerializeSQueryTableRsp
(
NULL
,
0
,
&
rsp
);
int32_t
contLen
=
tSerializeSQueryTableRsp
(
NULL
,
0
,
&
rsp
);
void
*
msg
=
rpcMallocCont
(
contLen
);
void
*
msg
=
rpcMallocCont
(
contLen
);
tSerializeSQueryTableRsp
(
msg
,
contLen
,
&
rsp
);
tSerializeSQueryTableRsp
(
msg
,
contLen
,
&
rsp
);
SRpcMsg
rpcRsp
=
{
SRpcMsg
rpcRsp
=
{
.
msgType
=
TDMT_VND_QUERY_RSP
,
.
msgType
=
TDMT_VND_QUERY_RSP
,
.
handle
=
pConn
->
handle
,
.
handle
=
pConn
->
handle
,
.
ahandle
=
pConn
->
ahandle
,
.
ahandle
=
pConn
->
ahandle
,
.
pCont
=
msg
,
.
pCont
=
msg
,
.
contLen
=
contLen
,
.
contLen
=
contLen
,
.
code
=
code
,
.
code
=
code
,
};
};
tmsgSendRsp
(
&
rpcRsp
);
tmsgSendRsp
(
&
rpcRsp
);
...
@@ -68,12 +72,12 @@ int32_t qwBuildAndSendReadyRsp(SQWConnInfo *pConn, int32_t code) {
...
@@ -68,12 +72,12 @@ int32_t qwBuildAndSendReadyRsp(SQWConnInfo *pConn, int32_t code) {
pRsp
->
code
=
code
;
pRsp
->
code
=
code
;
SRpcMsg
rpcRsp
=
{
SRpcMsg
rpcRsp
=
{
.
msgType
=
TDMT_VND_RES_READY_RSP
,
.
msgType
=
TDMT_VND_RES_READY_RSP
,
.
handle
=
pConn
->
handle
,
.
handle
=
pConn
->
handle
,
.
ahandle
=
NULL
,
.
ahandle
=
NULL
,
.
pCont
=
pRsp
,
.
pCont
=
pRsp
,
.
contLen
=
sizeof
(
*
pRsp
),
.
contLen
=
sizeof
(
*
pRsp
),
.
code
=
code
,
.
code
=
code
,
};
};
tmsgSendRsp
(
&
rpcRsp
);
tmsgSendRsp
(
&
rpcRsp
);
...
@@ -81,20 +85,20 @@ int32_t qwBuildAndSendReadyRsp(SQWConnInfo *pConn, int32_t code) {
...
@@ -81,20 +85,20 @@ int32_t qwBuildAndSendReadyRsp(SQWConnInfo *pConn, int32_t code) {
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
int32_t
qwBuildAndSendExplainRsp
(
SQWConnInfo
*
pConn
,
SExplainExecInfo
*
execInfo
,
int32_t
num
)
{
int32_t
qwBuildAndSendExplainRsp
(
SQWConnInfo
*
pConn
,
SExplainExecInfo
*
execInfo
,
int32_t
num
)
{
SExplainRsp
rsp
=
{.
numOfPlans
=
num
,
.
subplanInfo
=
execInfo
};
SExplainRsp
rsp
=
{.
numOfPlans
=
num
,
.
subplanInfo
=
execInfo
};
int32_t
contLen
=
tSerializeSExplainRsp
(
NULL
,
0
,
&
rsp
);
int32_t
contLen
=
tSerializeSExplainRsp
(
NULL
,
0
,
&
rsp
);
void
*
pRsp
=
rpcMallocCont
(
contLen
);
void
*
pRsp
=
rpcMallocCont
(
contLen
);
tSerializeSExplainRsp
(
pRsp
,
contLen
,
&
rsp
);
tSerializeSExplainRsp
(
pRsp
,
contLen
,
&
rsp
);
SRpcMsg
rpcRsp
=
{
SRpcMsg
rpcRsp
=
{
.
msgType
=
TDMT_VND_EXPLAIN_RSP
,
.
msgType
=
TDMT_VND_EXPLAIN_RSP
,
.
handle
=
pConn
->
handle
,
.
handle
=
pConn
->
handle
,
.
ahandle
=
pConn
->
ahandle
,
.
ahandle
=
pConn
->
ahandle
,
.
pCont
=
pRsp
,
.
pCont
=
pRsp
,
.
contLen
=
contLen
,
.
contLen
=
contLen
,
.
code
=
0
,
.
code
=
0
,
};
};
tmsgSendRsp
(
&
rpcRsp
);
tmsgSendRsp
(
&
rpcRsp
);
...
@@ -104,16 +108,16 @@ int32_t qwBuildAndSendExplainRsp(SQWConnInfo *pConn, SExplainExecInfo *execInfo,
...
@@ -104,16 +108,16 @@ int32_t qwBuildAndSendExplainRsp(SQWConnInfo *pConn, SExplainExecInfo *execInfo,
int32_t
qwBuildAndSendHbRsp
(
SQWConnInfo
*
pConn
,
SSchedulerHbRsp
*
pStatus
,
int32_t
code
)
{
int32_t
qwBuildAndSendHbRsp
(
SQWConnInfo
*
pConn
,
SSchedulerHbRsp
*
pStatus
,
int32_t
code
)
{
int32_t
contLen
=
tSerializeSSchedulerHbRsp
(
NULL
,
0
,
pStatus
);
int32_t
contLen
=
tSerializeSSchedulerHbRsp
(
NULL
,
0
,
pStatus
);
void
*
pRsp
=
rpcMallocCont
(
contLen
);
void
*
pRsp
=
rpcMallocCont
(
contLen
);
tSerializeSSchedulerHbRsp
(
pRsp
,
contLen
,
pStatus
);
tSerializeSSchedulerHbRsp
(
pRsp
,
contLen
,
pStatus
);
SRpcMsg
rpcRsp
=
{
SRpcMsg
rpcRsp
=
{
.
msgType
=
TDMT_VND_QUERY_HEARTBEAT_RSP
,
.
msgType
=
TDMT_VND_QUERY_HEARTBEAT_RSP
,
.
handle
=
pConn
->
handle
,
.
handle
=
pConn
->
handle
,
.
ahandle
=
pConn
->
ahandle
,
.
ahandle
=
pConn
->
ahandle
,
.
pCont
=
pRsp
,
.
pCont
=
pRsp
,
.
contLen
=
contLen
,
.
contLen
=
contLen
,
.
code
=
code
,
.
code
=
code
,
};
};
tmsgSendRsp
(
&
rpcRsp
);
tmsgSendRsp
(
&
rpcRsp
);
...
@@ -129,12 +133,12 @@ int32_t qwBuildAndSendFetchRsp(SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int3
...
@@ -129,12 +133,12 @@ int32_t qwBuildAndSendFetchRsp(SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int3
}
}
SRpcMsg
rpcRsp
=
{
SRpcMsg
rpcRsp
=
{
.
msgType
=
TDMT_VND_FETCH_RSP
,
.
msgType
=
TDMT_VND_FETCH_RSP
,
.
handle
=
pConn
->
handle
,
.
handle
=
pConn
->
handle
,
.
ahandle
=
pConn
->
ahandle
,
.
ahandle
=
pConn
->
ahandle
,
.
pCont
=
pRsp
,
.
pCont
=
pRsp
,
.
contLen
=
sizeof
(
*
pRsp
)
+
dataLength
,
.
contLen
=
sizeof
(
*
pRsp
)
+
dataLength
,
.
code
=
code
,
.
code
=
code
,
};
};
tmsgSendRsp
(
&
rpcRsp
);
tmsgSendRsp
(
&
rpcRsp
);
...
@@ -147,12 +151,12 @@ int32_t qwBuildAndSendCancelRsp(SQWConnInfo *pConn, int32_t code) {
...
@@ -147,12 +151,12 @@ int32_t qwBuildAndSendCancelRsp(SQWConnInfo *pConn, int32_t code) {
pRsp
->
code
=
code
;
pRsp
->
code
=
code
;
SRpcMsg
rpcRsp
=
{
SRpcMsg
rpcRsp
=
{
.
msgType
=
TDMT_VND_CANCEL_TASK_RSP
,
.
msgType
=
TDMT_VND_CANCEL_TASK_RSP
,
.
handle
=
pConn
->
handle
,
.
handle
=
pConn
->
handle
,
.
ahandle
=
pConn
->
ahandle
,
.
ahandle
=
pConn
->
ahandle
,
.
pCont
=
pRsp
,
.
pCont
=
pRsp
,
.
contLen
=
sizeof
(
*
pRsp
),
.
contLen
=
sizeof
(
*
pRsp
),
.
code
=
code
,
.
code
=
code
,
};
};
tmsgSendRsp
(
&
rpcRsp
);
tmsgSendRsp
(
&
rpcRsp
);
...
@@ -164,12 +168,12 @@ int32_t qwBuildAndSendDropRsp(SQWConnInfo *pConn, int32_t code) {
...
@@ -164,12 +168,12 @@ int32_t qwBuildAndSendDropRsp(SQWConnInfo *pConn, int32_t code) {
pRsp
->
code
=
code
;
pRsp
->
code
=
code
;
SRpcMsg
rpcRsp
=
{
SRpcMsg
rpcRsp
=
{
.
msgType
=
TDMT_VND_DROP_TASK_RSP
,
.
msgType
=
TDMT_VND_DROP_TASK_RSP
,
.
handle
=
pConn
->
handle
,
.
handle
=
pConn
->
handle
,
.
ahandle
=
pConn
->
ahandle
,
.
ahandle
=
pConn
->
ahandle
,
.
pCont
=
pRsp
,
.
pCont
=
pRsp
,
.
contLen
=
sizeof
(
*
pRsp
),
.
contLen
=
sizeof
(
*
pRsp
),
.
code
=
code
,
.
code
=
code
,
};
};
tmsgSendRsp
(
&
rpcRsp
);
tmsgSendRsp
(
&
rpcRsp
);
...
@@ -187,7 +191,7 @@ int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) {
...
@@ -187,7 +191,7 @@ int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) {
return
-
1
;
return
-
1
;
}
}
col_id_t
cols
=
0
;
col_id_t
cols
=
0
;
SSchema
*
pSchema
=
showRsp
.
tableMeta
.
pSchemas
;
SSchema
*
pSchema
=
showRsp
.
tableMeta
.
pSchemas
;
const
SSchema
*
s
=
tGetTbnameColumnSchema
();
const
SSchema
*
s
=
tGetTbnameColumnSchema
();
...
@@ -231,17 +235,17 @@ int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) {
...
@@ -231,17 +235,17 @@ int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) {
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
int32_t
qwBuildAndSendShowFetchRsp
(
SRpcMsg
*
pMsg
,
SVShowTablesFetchReq
*
pFetchReq
)
{
int32_t
qwBuildAndSendShowFetchRsp
(
SRpcMsg
*
pMsg
,
SVShowTablesFetchReq
*
pFetchReq
)
{
SVShowTablesFetchRsp
*
pRsp
=
(
SVShowTablesFetchRsp
*
)
rpcMallocCont
(
sizeof
(
SVShowTablesFetchRsp
));
SVShowTablesFetchRsp
*
pRsp
=
(
SVShowTablesFetchRsp
*
)
rpcMallocCont
(
sizeof
(
SVShowTablesFetchRsp
));
int32_t
handle
=
htonl
(
pFetchReq
->
id
);
int32_t
handle
=
htonl
(
pFetchReq
->
id
);
pRsp
->
numOfRows
=
0
;
pRsp
->
numOfRows
=
0
;
SRpcMsg
rpcMsg
=
{
SRpcMsg
rpcMsg
=
{
.
handle
=
pMsg
->
handle
,
.
handle
=
pMsg
->
handle
,
.
ahandle
=
pMsg
->
ahandle
,
.
ahandle
=
pMsg
->
ahandle
,
.
pCont
=
pRsp
,
.
pCont
=
pRsp
,
.
contLen
=
sizeof
(
*
pRsp
),
.
contLen
=
sizeof
(
*
pRsp
),
.
code
=
0
,
.
code
=
0
,
};
};
tmsgSendRsp
(
&
rpcMsg
);
tmsgSendRsp
(
&
rpcMsg
);
...
@@ -249,7 +253,7 @@ int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq *pFetchRe
...
@@ -249,7 +253,7 @@ int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq *pFetchRe
}
}
int32_t
qwBuildAndSendCQueryMsg
(
QW_FPARAMS_DEF
,
SQWConnInfo
*
pConn
)
{
int32_t
qwBuildAndSendCQueryMsg
(
QW_FPARAMS_DEF
,
SQWConnInfo
*
pConn
)
{
SQueryContinueReq
*
req
=
(
SQueryContinueReq
*
)
rpcMallocCont
(
sizeof
(
SQueryContinueReq
));
SQueryContinueReq
*
req
=
(
SQueryContinueReq
*
)
rpcMallocCont
(
sizeof
(
SQueryContinueReq
));
if
(
NULL
==
req
)
{
if
(
NULL
==
req
)
{
QW_SCH_TASK_ELOG
(
"rpcMallocCont %d failed"
,
(
int32_t
)
sizeof
(
SQueryContinueReq
));
QW_SCH_TASK_ELOG
(
"rpcMallocCont %d failed"
,
(
int32_t
)
sizeof
(
SQueryContinueReq
));
QW_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
QW_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
...
@@ -261,12 +265,12 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn) {
...
@@ -261,12 +265,12 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn) {
req
->
taskId
=
tId
;
req
->
taskId
=
tId
;
SRpcMsg
pNewMsg
=
{
SRpcMsg
pNewMsg
=
{
.
handle
=
pConn
->
handle
,
.
handle
=
pConn
->
handle
,
.
ahandle
=
pConn
->
ahandle
,
.
ahandle
=
pConn
->
ahandle
,
.
msgType
=
TDMT_VND_QUERY_CONTINUE
,
.
msgType
=
TDMT_VND_QUERY_CONTINUE
,
.
pCont
=
req
,
.
pCont
=
req
,
.
contLen
=
sizeof
(
SQueryContinueReq
),
.
contLen
=
sizeof
(
SQueryContinueReq
),
.
code
=
0
,
.
code
=
0
,
};
};
int32_t
code
=
tmsgPutToQueue
(
&
mgmt
->
msgCb
,
QUERY_QUEUE
,
&
pNewMsg
);
int32_t
code
=
tmsgPutToQueue
(
&
mgmt
->
msgCb
,
QUERY_QUEUE
,
&
pNewMsg
);
...
@@ -281,28 +285,29 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn) {
...
@@ -281,28 +285,29 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn) {
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
int32_t
qwRegisterQueryBrokenLinkArg
(
QW_FPARAMS_DEF
,
SQWConnInfo
*
pConn
)
{
int32_t
qwRegisterQueryBrokenLinkArg
(
QW_FPARAMS_DEF
,
SQWConnInfo
*
pConn
)
{
STaskDropReq
*
req
=
(
STaskDropReq
*
)
rpcMallocCont
(
sizeof
(
STaskDropReq
));
STaskDropReq
*
req
=
(
STaskDropReq
*
)
rpcMallocCont
(
sizeof
(
STaskDropReq
));
if
(
NULL
==
req
)
{
if
(
NULL
==
req
)
{
QW_SCH_TASK_ELOG
(
"rpcMallocCont %d failed"
,
(
int32_t
)
sizeof
(
STaskDropReq
));
QW_SCH_TASK_ELOG
(
"rpcMallocCont %d failed"
,
(
int32_t
)
sizeof
(
STaskDropReq
));
QW_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
QW_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
req
->
header
.
vgId
=
htonl
(
mgmt
->
nodeId
);
req
->
header
.
vgId
=
htonl
(
mgmt
->
nodeId
);
req
->
sId
=
htobe64
(
sId
);
req
->
sId
=
htobe64
(
sId
);
req
->
queryId
=
htobe64
(
qId
);
req
->
queryId
=
htobe64
(
qId
);
req
->
taskId
=
htobe64
(
tId
);
req
->
taskId
=
htobe64
(
tId
);
req
->
refId
=
htobe64
(
rId
);
req
->
refId
=
htobe64
(
rId
);
SRpcMsg
pMsg
=
{
SRpcMsg
pMsg
=
{
.
handle
=
pConn
->
handle
,
.
handle
=
pConn
->
handle
,
.
ahandle
=
pConn
->
ahandle
,
.
ahandle
=
pConn
->
ahandle
,
.
msgType
=
TDMT_VND_DROP_TASK
,
.
msgType
=
TDMT_VND_DROP_TASK
,
.
pCont
=
req
,
.
pCont
=
req
,
.
contLen
=
sizeof
(
STaskDropReq
),
.
contLen
=
sizeof
(
STaskDropReq
),
.
code
=
TSDB_CODE_RPC_NETWORK_UNAVAIL
,
.
code
=
TSDB_CODE_RPC_NETWORK_UNAVAIL
,
};
};
tmsgRegisterBrokenLinkArg
(
&
mgmt
->
msgCb
,
&
pMsg
);
tmsgRegisterBrokenLinkArg
(
&
mgmt
->
msgCb
,
&
pMsg
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
...
@@ -328,41 +333,43 @@ int32_t qwRegisterHbBrokenLinkArg(SQWorkerMgmt *mgmt, uint64_t sId, SQWConnInfo
...
@@ -328,41 +333,43 @@ int32_t qwRegisterHbBrokenLinkArg(SQWorkerMgmt *mgmt, uint64_t sId, SQWConnInfo
taosMemoryFree
(
msg
);
taosMemoryFree
(
msg
);
QW_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
QW_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
SRpcMsg
pMsg
=
{
SRpcMsg
pMsg
=
{
.
handle
=
pConn
->
handle
,
.
handle
=
pConn
->
handle
,
.
ahandle
=
pConn
->
ahandle
,
.
ahandle
=
pConn
->
ahandle
,
.
msgType
=
TDMT_VND_QUERY_HEARTBEAT
,
.
msgType
=
TDMT_VND_QUERY_HEARTBEAT
,
.
pCont
=
msg
,
.
pCont
=
msg
,
.
contLen
=
msgSize
,
.
contLen
=
msgSize
,
.
code
=
TSDB_CODE_RPC_NETWORK_UNAVAIL
,
.
code
=
TSDB_CODE_RPC_NETWORK_UNAVAIL
,
};
};
tmsgRegisterBrokenLinkArg
(
&
mgmt
->
msgCb
,
&
pMsg
);
tmsgRegisterBrokenLinkArg
(
&
mgmt
->
msgCb
,
&
pMsg
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
int32_t
qWorkerProcessQueryMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
)
{
int32_t
qWorkerProcessQueryMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
)
{
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
)
{
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
)
{
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
}
int32_t
code
=
0
;
int32_t
code
=
0
;
SSubQueryMsg
*
msg
=
pMsg
->
pCont
;
SSubQueryMsg
*
msg
=
pMsg
->
pCont
;
SQWorkerMgmt
*
mgmt
=
(
SQWorkerMgmt
*
)
qWorkerMgmt
;
SQWorkerMgmt
*
mgmt
=
(
SQWorkerMgmt
*
)
qWorkerMgmt
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<=
sizeof
(
*
msg
))
{
if
(
NULL
==
msg
||
pMsg
->
contLen
<=
sizeof
(
*
msg
))
{
QW_ELOG
(
"invalid query msg, msg:%p, msgLen:%d"
,
msg
,
pMsg
->
contLen
);
QW_ELOG
(
"invalid query msg, msg:%p, msgLen:%d"
,
msg
,
pMsg
->
contLen
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
}
msg
->
sId
=
be64toh
(
msg
->
sId
);
msg
->
sId
=
be64toh
(
msg
->
sId
);
msg
->
queryId
=
be64toh
(
msg
->
queryId
);
msg
->
queryId
=
be64toh
(
msg
->
queryId
);
msg
->
taskId
=
be64toh
(
msg
->
taskId
);
msg
->
taskId
=
be64toh
(
msg
->
taskId
);
msg
->
refId
=
be64toh
(
msg
->
refId
);
msg
->
refId
=
be64toh
(
msg
->
refId
);
msg
->
phyLen
=
ntohl
(
msg
->
phyLen
);
msg
->
phyLen
=
ntohl
(
msg
->
phyLen
);
msg
->
sqlLen
=
ntohl
(
msg
->
sqlLen
);
msg
->
sqlLen
=
ntohl
(
msg
->
sqlLen
);
uint64_t
sId
=
msg
->
sId
;
uint64_t
sId
=
msg
->
sId
;
uint64_t
qId
=
msg
->
queryId
;
uint64_t
qId
=
msg
->
queryId
;
...
@@ -373,7 +380,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
...
@@ -373,7 +380,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
qwMsg
.
connInfo
.
handle
=
pMsg
->
handle
;
qwMsg
.
connInfo
.
handle
=
pMsg
->
handle
;
qwMsg
.
connInfo
.
ahandle
=
pMsg
->
ahandle
;
qwMsg
.
connInfo
.
ahandle
=
pMsg
->
ahandle
;
char
*
sql
=
strndup
(
msg
->
msg
,
msg
->
sqlLen
);
char
*
sql
=
strndup
(
msg
->
msg
,
msg
->
sqlLen
);
QW_SCH_TASK_DLOG
(
"processQuery start, node:%p, handle:%p, sql:%s"
,
node
,
pMsg
->
handle
,
sql
);
QW_SCH_TASK_DLOG
(
"processQuery start, node:%p, handle:%p, sql:%s"
,
node
,
pMsg
->
handle
,
sql
);
taosMemoryFreeClear
(
sql
);
taosMemoryFreeClear
(
sql
);
...
@@ -381,17 +388,17 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
...
@@ -381,17 +388,17 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
QW_SCH_TASK_DLOG
(
"processQuery end, node:%p"
,
node
);
QW_SCH_TASK_DLOG
(
"processQuery end, node:%p"
,
node
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
int32_t
qWorkerProcessCQueryMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
)
{
int32_t
qWorkerProcessCQueryMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
int8_t
status
=
0
;
int8_t
status
=
0
;
bool
queryDone
=
false
;
bool
queryDone
=
false
;
SQueryContinueReq
*
msg
=
(
SQueryContinueReq
*
)
pMsg
->
pCont
;
SQueryContinueReq
*
msg
=
(
SQueryContinueReq
*
)
pMsg
->
pCont
;
bool
needStop
=
false
;
bool
needStop
=
false
;
SQWTaskCtx
*
handles
=
NULL
;
SQWTaskCtx
*
handles
=
NULL
;
SQWorkerMgmt
*
mgmt
=
(
SQWorkerMgmt
*
)
qWorkerMgmt
;
SQWorkerMgmt
*
mgmt
=
(
SQWorkerMgmt
*
)
qWorkerMgmt
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
QW_ELOG
(
"invalid cquery msg, msg:%p, msgLen:%d"
,
msg
,
pMsg
->
contLen
);
QW_ELOG
(
"invalid cquery msg, msg:%p, msgLen:%d"
,
msg
,
pMsg
->
contLen
);
...
@@ -401,7 +408,7 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
...
@@ -401,7 +408,7 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
uint64_t
sId
=
msg
->
sId
;
uint64_t
sId
=
msg
->
sId
;
uint64_t
qId
=
msg
->
queryId
;
uint64_t
qId
=
msg
->
queryId
;
uint64_t
tId
=
msg
->
taskId
;
uint64_t
tId
=
msg
->
taskId
;
int64_t
rId
=
0
;
int64_t
rId
=
0
;
SQWMsg
qwMsg
=
{.
node
=
node
,
.
msg
=
NULL
,
.
msgLen
=
0
};
SQWMsg
qwMsg
=
{.
node
=
node
,
.
msg
=
NULL
,
.
msgLen
=
0
};
qwMsg
.
connInfo
.
handle
=
pMsg
->
handle
;
qwMsg
.
connInfo
.
handle
=
pMsg
->
handle
;
...
@@ -413,10 +420,10 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
...
@@ -413,10 +420,10 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
QW_SCH_TASK_DLOG
(
"processCQuery end, node:%p"
,
node
);
QW_SCH_TASK_DLOG
(
"processCQuery end, node:%p"
,
node
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
int32_t
qWorkerProcessReadyMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
)
{
int32_t
qWorkerProcessReadyMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
){
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
)
{
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
)
{
return
TSDB_CODE_QRY_INVALID_INPUT
;
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
}
...
@@ -426,7 +433,7 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
...
@@ -426,7 +433,7 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
QW_ELOG
(
"invalid task ready msg, msg:%p, msgLen:%d"
,
msg
,
pMsg
->
contLen
);
QW_ELOG
(
"invalid task ready msg, msg:%p, msgLen:%d"
,
msg
,
pMsg
->
contLen
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
}
msg
->
sId
=
be64toh
(
msg
->
sId
);
msg
->
sId
=
be64toh
(
msg
->
sId
);
msg
->
queryId
=
be64toh
(
msg
->
queryId
);
msg
->
queryId
=
be64toh
(
msg
->
queryId
);
...
@@ -435,7 +442,7 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
...
@@ -435,7 +442,7 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
uint64_t
sId
=
msg
->
sId
;
uint64_t
sId
=
msg
->
sId
;
uint64_t
qId
=
msg
->
queryId
;
uint64_t
qId
=
msg
->
queryId
;
uint64_t
tId
=
msg
->
taskId
;
uint64_t
tId
=
msg
->
taskId
;
int64_t
rId
=
0
;
int64_t
rId
=
0
;
SQWMsg
qwMsg
=
{.
node
=
node
,
.
msg
=
NULL
,
.
msgLen
=
0
};
SQWMsg
qwMsg
=
{.
node
=
node
,
.
msg
=
NULL
,
.
msgLen
=
0
};
qwMsg
.
connInfo
.
handle
=
pMsg
->
handle
;
qwMsg
.
connInfo
.
handle
=
pMsg
->
handle
;
...
@@ -446,7 +453,7 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
...
@@ -446,7 +453,7 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
QW_ERR_RET
(
qwProcessReady
(
QW_FPARAMS
(),
&
qwMsg
));
QW_ERR_RET
(
qwProcessReady
(
QW_FPARAMS
(),
&
qwMsg
));
QW_SCH_TASK_DLOG
(
"processReady end, node:%p"
,
node
);
QW_SCH_TASK_DLOG
(
"processReady end, node:%p"
,
node
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
...
@@ -455,24 +462,24 @@ int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
...
@@ -455,24 +462,24 @@ int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
return
TSDB_CODE_QRY_INVALID_INPUT
;
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
}
int32_t
code
=
0
;
int32_t
code
=
0
;
SSchTasksStatusReq
*
msg
=
pMsg
->
pCont
;
SSchTasksStatusReq
*
msg
=
pMsg
->
pCont
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
qError
(
"invalid task status msg"
);
qError
(
"invalid task status msg"
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
}
SQWorkerMgmt
*
mgmt
=
(
SQWorkerMgmt
*
)
qWorkerMgmt
;
SQWorkerMgmt
*
mgmt
=
(
SQWorkerMgmt
*
)
qWorkerMgmt
;
msg
->
sId
=
htobe64
(
msg
->
sId
);
msg
->
sId
=
htobe64
(
msg
->
sId
);
uint64_t
sId
=
msg
->
sId
;
uint64_t
sId
=
msg
->
sId
;
SSchedulerStatusRsp
*
sStatus
=
NULL
;
SSchedulerStatusRsp
*
sStatus
=
NULL
;
//
QW_ERR_JRET(qwGetSchTasksStatus(qWorkerMgmt, msg->sId, &sStatus));
//QW_ERR_JRET(qwGetSchTasksStatus(qWorkerMgmt, msg->sId, &sStatus));
_return:
_return:
//
QW_ERR_RET(qwBuildAndSendStatusRsp(pMsg, sStatus));
//QW_ERR_RET(qwBuildAndSendStatusRsp(pMsg, sStatus));
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
...
@@ -484,11 +491,11 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
...
@@ -484,11 +491,11 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
SResFetchReq
*
msg
=
pMsg
->
pCont
;
SResFetchReq
*
msg
=
pMsg
->
pCont
;
SQWorkerMgmt
*
mgmt
=
(
SQWorkerMgmt
*
)
qWorkerMgmt
;
SQWorkerMgmt
*
mgmt
=
(
SQWorkerMgmt
*
)
qWorkerMgmt
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
QW_ELOG
(
"invalid fetch msg, msg:%p, msgLen:%d"
,
msg
,
pMsg
->
contLen
);
QW_ELOG
(
"invalid fetch msg, msg:%p, msgLen:%d"
,
msg
,
pMsg
->
contLen
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
}
msg
->
sId
=
be64toh
(
msg
->
sId
);
msg
->
sId
=
be64toh
(
msg
->
sId
);
msg
->
queryId
=
be64toh
(
msg
->
queryId
);
msg
->
queryId
=
be64toh
(
msg
->
queryId
);
...
@@ -497,7 +504,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
...
@@ -497,7 +504,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
uint64_t
sId
=
msg
->
sId
;
uint64_t
sId
=
msg
->
sId
;
uint64_t
qId
=
msg
->
queryId
;
uint64_t
qId
=
msg
->
queryId
;
uint64_t
tId
=
msg
->
taskId
;
uint64_t
tId
=
msg
->
taskId
;
int64_t
rId
=
0
;
int64_t
rId
=
0
;
SQWMsg
qwMsg
=
{.
node
=
node
,
.
msg
=
NULL
,
.
msgLen
=
0
};
SQWMsg
qwMsg
=
{.
node
=
node
,
.
msg
=
NULL
,
.
msgLen
=
0
};
qwMsg
.
connInfo
.
handle
=
pMsg
->
handle
;
qwMsg
.
connInfo
.
handle
=
pMsg
->
handle
;
...
@@ -509,7 +516,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
...
@@ -509,7 +516,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
QW_SCH_TASK_DLOG
(
"processFetch end, node:%p"
,
node
);
QW_SCH_TASK_DLOG
(
"processFetch end, node:%p"
,
node
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
int32_t
qWorkerProcessFetchRsp
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
)
{
int32_t
qWorkerProcessFetchRsp
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
)
{
...
@@ -522,13 +529,13 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
...
@@ -522,13 +529,13 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
return
TSDB_CODE_QRY_INVALID_INPUT
;
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
}
SQWorkerMgmt
*
mgmt
=
(
SQWorkerMgmt
*
)
qWorkerMgmt
;
SQWorkerMgmt
*
mgmt
=
(
SQWorkerMgmt
*
)
qWorkerMgmt
;
int32_t
code
=
0
;
int32_t
code
=
0
;
STaskCancelReq
*
msg
=
pMsg
->
pCont
;
STaskCancelReq
*
msg
=
pMsg
->
pCont
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
qError
(
"invalid task cancel msg"
);
qError
(
"invalid task cancel msg"
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
}
msg
->
sId
=
be64toh
(
msg
->
sId
);
msg
->
sId
=
be64toh
(
msg
->
sId
);
msg
->
queryId
=
be64toh
(
msg
->
queryId
);
msg
->
queryId
=
be64toh
(
msg
->
queryId
);
...
@@ -544,7 +551,7 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
...
@@ -544,7 +551,7 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
qwMsg
.
connInfo
.
handle
=
pMsg
->
handle
;
qwMsg
.
connInfo
.
handle
=
pMsg
->
handle
;
qwMsg
.
connInfo
.
ahandle
=
pMsg
->
ahandle
;
qwMsg
.
connInfo
.
ahandle
=
pMsg
->
ahandle
;
//
QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId));
//QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId));
_return:
_return:
...
@@ -559,14 +566,14 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
...
@@ -559,14 +566,14 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
return
TSDB_CODE_QRY_INVALID_INPUT
;
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
}
int32_t
code
=
0
;
int32_t
code
=
0
;
STaskDropReq
*
msg
=
pMsg
->
pCont
;
STaskDropReq
*
msg
=
pMsg
->
pCont
;
SQWorkerMgmt
*
mgmt
=
(
SQWorkerMgmt
*
)
qWorkerMgmt
;
SQWorkerMgmt
*
mgmt
=
(
SQWorkerMgmt
*
)
qWorkerMgmt
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
QW_ELOG
(
"invalid task drop msg, msg:%p, msgLen:%d"
,
msg
,
pMsg
->
contLen
);
QW_ELOG
(
"invalid task drop msg, msg:%p, msgLen:%d"
,
msg
,
pMsg
->
contLen
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
}
msg
->
sId
=
be64toh
(
msg
->
sId
);
msg
->
sId
=
be64toh
(
msg
->
sId
);
msg
->
queryId
=
be64toh
(
msg
->
queryId
);
msg
->
queryId
=
be64toh
(
msg
->
queryId
);
...
@@ -583,7 +590,7 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
...
@@ -583,7 +590,7 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
qwMsg
.
connInfo
.
ahandle
=
pMsg
->
ahandle
;
qwMsg
.
connInfo
.
ahandle
=
pMsg
->
ahandle
;
if
(
TSDB_CODE_RPC_NETWORK_UNAVAIL
==
pMsg
->
code
)
{
if
(
TSDB_CODE_RPC_NETWORK_UNAVAIL
==
pMsg
->
code
)
{
QW_SCH_TASK_DLOG
(
"receive drop task due to network broken, error:%s"
,
tstrerror
(
pMsg
->
code
));
QW_SCH_TASK_DLOG
(
"receive drop task due to network broken, error:%s"
,
tstrerror
(
pMsg
->
code
));
}
}
QW_SCH_TASK_DLOG
(
"processDrop start, node:%p, handle:%p"
,
node
,
pMsg
->
handle
);
QW_SCH_TASK_DLOG
(
"processDrop start, node:%p, handle:%p"
,
node
,
pMsg
->
handle
);
...
@@ -600,14 +607,14 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
...
@@ -600,14 +607,14 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
return
TSDB_CODE_QRY_INVALID_INPUT
;
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
}
int32_t
code
=
0
;
int32_t
code
=
0
;
SSchedulerHbReq
req
=
{
0
};
SSchedulerHbReq
req
=
{
0
};
SQWorkerMgmt
*
mgmt
=
(
SQWorkerMgmt
*
)
qWorkerMgmt
;
SQWorkerMgmt
*
mgmt
=
(
SQWorkerMgmt
*
)
qWorkerMgmt
;
if
(
NULL
==
pMsg
->
pCont
)
{
if
(
NULL
==
pMsg
->
pCont
)
{
QW_ELOG
(
"invalid hb msg, msg:%p, msgLen:%d"
,
pMsg
->
pCont
,
pMsg
->
contLen
);
QW_ELOG
(
"invalid hb msg, msg:%p, msgLen:%d"
,
pMsg
->
pCont
,
pMsg
->
contLen
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
}
if
(
tDeserializeSSchedulerHbReq
(
pMsg
->
pCont
,
pMsg
->
contLen
,
&
req
))
{
if
(
tDeserializeSSchedulerHbReq
(
pMsg
->
pCont
,
pMsg
->
contLen
,
&
req
))
{
QW_ELOG
(
"invalid hb msg, msg:%p, msgLen:%d"
,
pMsg
->
pCont
,
pMsg
->
contLen
);
QW_ELOG
(
"invalid hb msg, msg:%p, msgLen:%d"
,
pMsg
->
pCont
,
pMsg
->
contLen
);
...
@@ -616,12 +623,12 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
...
@@ -616,12 +623,12 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
}
}
uint64_t
sId
=
req
.
sId
;
uint64_t
sId
=
req
.
sId
;
SQWMsg
qwMsg
=
{.
node
=
node
,
.
msg
=
NULL
,
.
msgLen
=
0
,
.
code
=
pMsg
->
code
};
SQWMsg
qwMsg
=
{.
node
=
node
,
.
msg
=
NULL
,
.
msgLen
=
0
,
.
code
=
pMsg
->
code
};
qwMsg
.
connInfo
.
handle
=
pMsg
->
handle
;
qwMsg
.
connInfo
.
handle
=
pMsg
->
handle
;
qwMsg
.
connInfo
.
ahandle
=
pMsg
->
ahandle
;
qwMsg
.
connInfo
.
ahandle
=
pMsg
->
ahandle
;
if
(
TSDB_CODE_RPC_NETWORK_UNAVAIL
==
pMsg
->
code
)
{
if
(
TSDB_CODE_RPC_NETWORK_UNAVAIL
==
pMsg
->
code
)
{
QW_SCH_DLOG
(
"receive Hb msg due to network broken, error:%s"
,
tstrerror
(
pMsg
->
code
));
QW_SCH_DLOG
(
"receive Hb msg due to network broken, error:%s"
,
tstrerror
(
pMsg
->
code
));
}
}
QW_SCH_DLOG
(
"processHb start, node:%p, handle:%p"
,
node
,
pMsg
->
handle
);
QW_SCH_DLOG
(
"processHb start, node:%p, handle:%p"
,
node
,
pMsg
->
handle
);
...
@@ -638,7 +645,7 @@ int32_t qWorkerProcessShowMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
...
@@ -638,7 +645,7 @@ int32_t qWorkerProcessShowMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
return
TSDB_CODE_QRY_INVALID_INPUT
;
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
}
int32_t
code
=
0
;
int32_t
code
=
0
;
SVShowTablesReq
*
pReq
=
pMsg
->
pCont
;
SVShowTablesReq
*
pReq
=
pMsg
->
pCont
;
QW_RET
(
qwBuildAndSendShowRsp
(
pMsg
,
code
));
QW_RET
(
qwBuildAndSendShowRsp
(
pMsg
,
code
));
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录