Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
1b0c9843
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
1b0c9843
编写于
11月 17, 2022
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
enh: refact query message
上级
fb861a39
变更
5
显示空白变更内容
内联
并排
Showing
5 changed file
with
326 addition
and
125 deletion
+326
-125
include/common/tmsg.h
include/common/tmsg.h
+14
-4
source/common/src/tmsg.c
source/common/src/tmsg.c
+135
-0
source/libs/qworker/src/qwMsg.c
source/libs/qworker/src/qwMsg.c
+106
-87
source/libs/qworker/test/qworkerTests.cpp
source/libs/qworker/test/qworkerTests.cpp
+23
-6
source/libs/scheduler/src/schRemote.c
source/libs/scheduler/src/schRemote.c
+48
-28
未找到文件。
include/common/tmsg.h
浏览文件 @
1b0c9843
...
...
@@ -1615,15 +1615,21 @@ typedef struct SSubQueryMsg {
uint64_t
taskId
;
int64_t
refId
;
int32_t
execId
;
int32_t
msgMask
;
int8_t
taskType
;
int8_t
explain
;
int8_t
needFetch
;
uint32_t
sqlLen
;
// the query sql,
uint32_t
phyLen
;
int32_t
msgMask
;
char
msg
[]
;
uint32_t
sqlLen
;
char
*
sql
;
uint32_t
msgLen
;
char
*
msg
;
}
SSubQueryMsg
;
int32_t
tSerializeSSubQueryMsg
(
void
*
buf
,
int32_t
bufLen
,
SSubQueryMsg
*
pReq
);
int32_t
tDeserializeSSubQueryMsg
(
void
*
buf
,
int32_t
bufLen
,
SSubQueryMsg
*
pReq
);
void
tFreeSSubQueryMsg
(
SSubQueryMsg
*
pReq
);
typedef
struct
{
SMsgHead
header
;
uint64_t
sId
;
...
...
@@ -1732,6 +1738,10 @@ typedef struct {
int32_t
execId
;
}
STaskDropReq
;
int32_t
tSerializeSTaskDropReq
(
void
*
buf
,
int32_t
bufLen
,
STaskDropReq
*
pReq
);
int32_t
tDeserializeSTaskDropReq
(
void
*
buf
,
int32_t
bufLen
,
STaskDropReq
*
pReq
);
typedef
struct
{
int32_t
code
;
}
STaskDropRsp
;
...
...
source/common/src/tmsg.c
浏览文件 @
1b0c9843
...
...
@@ -4643,6 +4643,141 @@ int32_t tDeserializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) {
return
0
;
}
int32_t
tSerializeSSubQueryMsg
(
void
*
buf
,
int32_t
bufLen
,
SSubQueryMsg
*
pReq
)
{
int32_t
headLen
=
sizeof
(
SMsgHead
);
if
(
buf
!=
NULL
)
{
buf
=
(
char
*
)
buf
+
headLen
;
bufLen
-=
headLen
;
}
SEncoder
encoder
=
{
0
};
tEncoderInit
(
&
encoder
,
buf
,
bufLen
);
if
(
tStartEncode
(
&
encoder
)
<
0
)
return
-
1
;
if
(
tEncodeU64
(
&
encoder
,
pReq
->
sId
)
<
0
)
return
-
1
;
if
(
tEncodeU64
(
&
encoder
,
pReq
->
queryId
)
<
0
)
return
-
1
;
if
(
tEncodeU64
(
&
encoder
,
pReq
->
taskId
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pReq
->
refId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
execId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
msgMask
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
taskType
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
explain
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
needFetch
)
<
0
)
return
-
1
;
if
(
tEncodeU32
(
&
encoder
,
pReq
->
sqlLen
)
<
0
)
return
-
1
;
if
(
tEncodeCStrWithLen
(
&
encoder
,
pReq
->
sql
,
pReq
->
sqlLen
)
<
0
)
return
-
1
;
if
(
tEncodeU32
(
&
encoder
,
pReq
->
msgLen
)
<
0
)
return
-
1
;
if
(
tEncodeCStrWithLen
(
&
encoder
,
pReq
->
msg
,
pReq
->
msgLen
)
<
0
)
return
-
1
;
tEndEncode
(
&
encoder
);
int32_t
tlen
=
encoder
.
pos
;
tEncoderClear
(
&
encoder
);
if
(
buf
!=
NULL
)
{
SMsgHead
*
pHead
=
(
SMsgHead
*
)((
char
*
)
buf
-
headLen
);
pHead
->
vgId
=
htonl
(
pReq
->
header
.
vgId
);
pHead
->
contLen
=
htonl
(
tlen
+
headLen
);
}
return
tlen
+
headLen
;
}
int32_t
tDeserializeSSubQueryMsg
(
void
*
buf
,
int32_t
bufLen
,
SSubQueryMsg
*
pReq
)
{
int32_t
headLen
=
sizeof
(
SMsgHead
);
SMsgHead
*
pHead
=
buf
;
pHead
->
vgId
=
pReq
->
header
.
vgId
;
pHead
->
contLen
=
pReq
->
header
.
contLen
;
SDecoder
decoder
=
{
0
};
tDecoderInit
(
&
decoder
,
(
char
*
)
buf
+
headLen
,
bufLen
-
headLen
);
if
(
tStartDecode
(
&
decoder
)
<
0
)
return
-
1
;
if
(
tDecodeU64
(
&
decoder
,
&
pReq
->
sId
)
<
0
)
return
-
1
;
if
(
tDecodeU64
(
&
decoder
,
&
pReq
->
queryId
)
<
0
)
return
-
1
;
if
(
tDecodeU64
(
&
decoder
,
&
pReq
->
taskId
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
pReq
->
refId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
execId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
msgMask
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
taskType
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
explain
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
needFetch
)
<
0
)
return
-
1
;
if
(
tDecodeU32
(
&
decoder
,
&
pReq
->
sqlLen
)
<
0
)
return
-
1
;
if
(
tDecodeCStrAlloc
(
&
decoder
,
&
pReq
->
sql
)
<
0
)
return
-
1
;
if
(
tDecodeU32
(
&
decoder
,
&
pReq
->
msgLen
)
<
0
)
return
-
1
;
if
(
tDecodeCStrAlloc
(
&
decoder
,
&
pReq
->
msg
)
<
0
)
return
-
1
;
tEndDecode
(
&
decoder
);
tDecoderClear
(
&
decoder
);
return
0
;
}
void
tFreeSSubQueryMsg
(
SSubQueryMsg
*
pReq
)
{
if
(
NULL
==
pReq
)
{
return
;
}
taosMemoryFreeClear
(
pReq
->
sql
);
taosMemoryFreeClear
(
pReq
->
msg
);
}
int32_t
tSerializeSTaskDropReq
(
void
*
buf
,
int32_t
bufLen
,
STaskDropReq
*
pReq
)
{
int32_t
headLen
=
sizeof
(
SMsgHead
);
if
(
buf
!=
NULL
)
{
buf
=
(
char
*
)
buf
+
headLen
;
bufLen
-=
headLen
;
}
SEncoder
encoder
=
{
0
};
tEncoderInit
(
&
encoder
,
buf
,
bufLen
);
if
(
tStartEncode
(
&
encoder
)
<
0
)
return
-
1
;
if
(
tEncodeU64
(
&
encoder
,
pReq
->
sId
)
<
0
)
return
-
1
;
if
(
tEncodeU64
(
&
encoder
,
pReq
->
queryId
)
<
0
)
return
-
1
;
if
(
tEncodeU64
(
&
encoder
,
pReq
->
taskId
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pReq
->
refId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
execId
)
<
0
)
return
-
1
;
tEndEncode
(
&
encoder
);
int32_t
tlen
=
encoder
.
pos
;
tEncoderClear
(
&
encoder
);
if
(
buf
!=
NULL
)
{
SMsgHead
*
pHead
=
(
SMsgHead
*
)((
char
*
)
buf
-
headLen
);
pHead
->
vgId
=
htonl
(
pReq
->
header
.
vgId
);
pHead
->
contLen
=
htonl
(
tlen
+
headLen
);
}
return
tlen
+
headLen
;
}
int32_t
tDeserializeSTaskDropReq
(
void
*
buf
,
int32_t
bufLen
,
STaskDropReq
*
pReq
)
{
int32_t
headLen
=
sizeof
(
SMsgHead
);
SMsgHead
*
pHead
=
buf
;
pHead
->
vgId
=
pReq
->
header
.
vgId
;
pHead
->
contLen
=
pReq
->
header
.
contLen
;
SDecoder
decoder
=
{
0
};
tDecoderInit
(
&
decoder
,
(
char
*
)
buf
+
headLen
,
bufLen
-
headLen
);
if
(
tStartDecode
(
&
decoder
)
<
0
)
return
-
1
;
if
(
tDecodeU64
(
&
decoder
,
&
pReq
->
sId
)
<
0
)
return
-
1
;
if
(
tDecodeU64
(
&
decoder
,
&
pReq
->
queryId
)
<
0
)
return
-
1
;
if
(
tDecodeU64
(
&
decoder
,
&
pReq
->
taskId
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
pReq
->
refId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
execId
)
<
0
)
return
-
1
;
tEndDecode
(
&
decoder
);
tDecoderClear
(
&
decoder
);
return
0
;
}
int32_t
tSerializeSSchedulerHbReq
(
void
*
buf
,
int32_t
bufLen
,
SSchedulerHbReq
*
pReq
)
{
int32_t
headLen
=
sizeof
(
SMsgHead
);
...
...
source/libs/qworker/src/qwMsg.c
浏览文件 @
1b0c9843
...
...
@@ -182,23 +182,37 @@ int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code) {
#endif
int32_t
qwBuildAndSendDropMsg
(
QW_FPARAMS_DEF
,
SRpcHandleInfo
*
pConn
)
{
STaskDropReq
*
req
=
(
STaskDropReq
*
)
rpcMallocCont
(
sizeof
(
STaskDropReq
));
if
(
NULL
==
req
)
{
QW_SCH_TASK_ELOG
(
"rpcMallocCont %d failed"
,
(
int32_t
)
sizeof
(
STaskDropReq
));
STaskDropReq
qMsg
;
qMsg
.
header
.
vgId
=
mgmt
->
nodeId
;
qMsg
.
header
.
contLen
=
0
;
qMsg
.
sId
=
sId
;
qMsg
.
queryId
=
qId
;
qMsg
.
taskId
=
tId
;
qMsg
.
refId
=
rId
;
qMsg
.
execId
=
eId
;
int32_t
msgSize
=
tSerializeSTaskDropReq
(
NULL
,
0
,
&
qMsg
);
if
(
msgSize
<
0
)
{
QW_SCH_TASK_ELOG
(
"tSerializeSTaskDropReq get size, msgSize:%d"
,
msgSize
);
QW_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
req
->
header
.
vgId
=
mgmt
->
nodeId
;
req
->
sId
=
sId
;
req
->
queryId
=
qId
;
req
->
taskId
=
tId
;
req
->
refId
=
rId
;
req
->
execId
=
eId
;
void
*
msg
=
rpcMallocCont
(
msgSize
);
if
(
NULL
==
msg
)
{
QW_SCH_TASK_ELOG
(
"rpcMallocCont %d failed"
,
msgSize
);
QW_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
if
(
tSerializeSTaskDropReq
(
msg
,
msgSize
,
&
qMsg
)
<
0
)
{
QW_SCH_TASK_ELOG
(
"tSerializeSTaskDropReq failed, msgSize:%d"
,
msgSize
);
rpcFreeCont
(
msg
);
QW_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
SRpcMsg
pNewMsg
=
{
.
msgType
=
TDMT_SCH_DROP_TASK
,
.
pCont
=
req
,
.
contLen
=
sizeof
(
STaskDropReq
)
,
.
pCont
=
msg
,
.
contLen
=
msgSize
,
.
code
=
0
,
.
info
=
*
pConn
,
};
...
...
@@ -247,22 +261,37 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
}
int32_t
qwRegisterQueryBrokenLinkArg
(
QW_FPARAMS_DEF
,
SRpcHandleInfo
*
pConn
)
{
STaskDropReq
*
req
=
(
STaskDropReq
*
)
rpcMallocCont
(
sizeof
(
STaskDropReq
));
if
(
NULL
==
req
)
{
QW_SCH_TASK_ELOG
(
"rpcMallocCont %d failed"
,
(
int32_t
)
sizeof
(
STaskDropReq
));
STaskDropReq
qMsg
;
qMsg
.
header
.
vgId
=
mgmt
->
nodeId
;
qMsg
.
header
.
contLen
=
0
;
qMsg
.
sId
=
sId
;
qMsg
.
queryId
=
qId
;
qMsg
.
taskId
=
tId
;
qMsg
.
refId
=
rId
;
qMsg
.
execId
=
eId
;
int32_t
msgSize
=
tSerializeSTaskDropReq
(
NULL
,
0
,
&
qMsg
);
if
(
msgSize
<
0
)
{
QW_SCH_TASK_ELOG
(
"tSerializeSTaskDropReq get size, msgSize:%d"
,
msgSize
);
QW_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
req
->
header
.
vgId
=
htonl
(
mgmt
->
nodeId
);
req
->
sId
=
htobe64
(
sId
);
req
->
queryId
=
htobe64
(
qId
);
req
->
taskId
=
htobe64
(
tId
);
req
->
refId
=
htobe64
(
rId
);
void
*
msg
=
rpcMallocCont
(
msgSize
);
if
(
NULL
==
msg
)
{
QW_SCH_TASK_ELOG
(
"rpcMallocCont %d failed"
,
msgSize
);
QW_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
if
(
tSerializeSTaskDropReq
(
msg
,
msgSize
,
&
qMsg
)
<
0
)
{
QW_SCH_TASK_ELOG
(
"tSerializeSTaskDropReq failed, msgSize:%d"
,
msgSize
);
rpcFreeCont
(
msg
);
QW_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
SRpcMsg
brokenMsg
=
{
.
msgType
=
TDMT_SCH_DROP_TASK
,
.
pCont
=
req
,
.
contLen
=
sizeof
(
STaskDropReq
)
,
.
pCont
=
msg
,
.
contLen
=
msgSize
,
.
code
=
TSDB_CODE_RPC_BROKEN_LINK
,
.
info
=
*
pConn
,
};
...
...
@@ -312,40 +341,31 @@ int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg, bool chkGran
}
int32_t
code
=
0
;
SSubQueryMsg
*
msg
=
pMsg
->
pCont
;
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<=
sizeof
(
*
msg
)
)
{
QW_ELOG
(
"
invalid query msg, msg:%p, msgLen:%d"
,
msg
,
pMsg
->
contLen
);
SSubQueryMsg
msg
=
{
0
};
if
(
tDeserializeSSubQueryMsg
(
pMsg
->
pCont
,
pMsg
->
contLen
,
&
msg
)
<
0
)
{
QW_ELOG
(
"
tDeserializeSSubQueryMsg failed, contLen:%d"
,
pMsg
->
contLen
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
msg
->
sId
=
be64toh
(
msg
->
sId
);
msg
->
queryId
=
be64toh
(
msg
->
queryId
);
msg
->
taskId
=
be64toh
(
msg
->
taskId
);
msg
->
refId
=
be64toh
(
msg
->
refId
);
msg
->
execId
=
ntohl
(
msg
->
execId
);
msg
->
phyLen
=
ntohl
(
msg
->
phyLen
);
msg
->
sqlLen
=
ntohl
(
msg
->
sqlLen
);
msg
->
msgMask
=
ntohl
(
msg
->
msgMask
);
if
(
chkGrant
&&
(
!
TEST_SHOW_REWRITE_MASK
(
msg
->
msgMask
))
&&
(
grantCheck
(
TSDB_GRANT_TIME
)
!=
TSDB_CODE_SUCCESS
))
{
QW_ELOG
(
"query failed cause of grant expired, msgMask:%d"
,
msg
->
msgMask
);
if
(
chkGrant
&&
(
!
TEST_SHOW_REWRITE_MASK
(
msg
.
msgMask
))
&&
(
grantCheck
(
TSDB_GRANT_TIME
)
!=
TSDB_CODE_SUCCESS
))
{
QW_ELOG
(
"query failed cause of grant expired, msgMask:%d"
,
msg
.
msgMask
);
tFreeSSubQueryMsg
(
&
msg
);
QW_ERR_RET
(
TSDB_CODE_GRANT_EXPIRED
);
}
uint64_t
sId
=
msg
->
sId
;
uint64_t
qId
=
msg
->
queryId
;
uint64_t
tId
=
msg
->
taskId
;
int64_t
rId
=
msg
->
refId
;
int32_t
eId
=
msg
->
execId
;
uint64_t
sId
=
msg
.
sId
;
uint64_t
qId
=
msg
.
queryId
;
uint64_t
tId
=
msg
.
taskId
;
int64_t
rId
=
msg
.
refId
;
int32_t
eId
=
msg
.
execId
;
SQWMsg
qwMsg
=
{
.
msgType
=
pMsg
->
msgType
,
.
msg
=
msg
->
msg
+
msg
->
sqlLen
,
.
msgLen
=
msg
->
phy
Len
,
.
connInfo
=
pMsg
->
info
};
.
msgType
=
pMsg
->
msgType
,
.
msg
=
msg
.
msg
,
.
msgLen
=
msg
.
msg
Len
,
.
connInfo
=
pMsg
->
info
};
QW_SCH_TASK_DLOG
(
"prerocessQuery start, handle:%p
"
,
pMsg
->
info
.
handle
);
QW_ERR_RET
(
qwPreprocessQuery
(
QW_FPARAMS
(),
&
qwMsg
)
);
QW_SCH_TASK_DLOG
(
"prerocessQuery end, handle:%p
"
,
pMsg
->
info
.
handl
e
);
QW_SCH_TASK_DLOG
(
"prerocessQuery start, handle:%p
, SQL:%s"
,
pMsg
->
info
.
handle
,
msg
.
sql
);
code
=
qwPreprocessQuery
(
QW_FPARAMS
(),
&
qwMsg
);
QW_SCH_TASK_DLOG
(
"prerocessQuery end, handle:%p
, code:%x"
,
pMsg
->
info
.
handle
,
cod
e
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -355,19 +375,25 @@ int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) {
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
SSubQueryMsg
*
msg
=
pMsg
->
pCont
;
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
SSubQueryMsg
msg
=
{
0
};
if
(
tDeserializeSSubQueryMsg
(
pMsg
->
pCont
,
pMsg
->
contLen
,
&
msg
)
<
0
)
{
QW_ELOG
(
"tDeserializeSSubQueryMsg failed, contLen:%d"
,
pMsg
->
contLen
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
uint64_t
sId
=
msg
->
sId
;
uint64_t
qId
=
msg
->
queryId
;
uint64_t
tId
=
msg
->
taskId
;
int64_t
rId
=
msg
->
refId
;
int32_t
eId
=
msg
->
execId
;
uint64_t
sId
=
msg
.
sId
;
uint64_t
qId
=
msg
.
queryId
;
uint64_t
tId
=
msg
.
taskId
;
int64_t
rId
=
msg
.
refId
;
int32_t
eId
=
msg
.
execId
;
QW_SCH_TASK_DLOG
(
"Abort prerocessQuery start, handle:%p"
,
pMsg
->
info
.
handle
);
qwAbortPrerocessQuery
(
QW_FPARAMS
());
QW_SCH_TASK_DLOG
(
"Abort prerocessQuery end, handle:%p"
,
pMsg
->
info
.
handle
);
tFreeSSubQueryMsg
(
&
msg
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -377,42 +403,41 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
}
int32_t
code
=
0
;
SSubQueryMsg
*
msg
=
pMsg
->
pCont
;
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
qwUpdateTimeInQueue
(
mgmt
,
ts
,
QUERY_QUEUE
);
QW_STAT_INC
(
mgmt
->
stat
.
msgStat
.
queryProcessed
,
1
);
if
(
NULL
==
msg
||
pMsg
->
contLen
<=
sizeof
(
*
msg
))
{
QW_ELOG
(
"invalid query msg, msg:%p, msgLen:%d"
,
msg
,
pMsg
->
contLen
);
SSubQueryMsg
msg
=
{
0
};
if
(
tDeserializeSSubQueryMsg
(
pMsg
->
pCont
,
pMsg
->
contLen
,
&
msg
)
<
0
)
{
QW_ELOG
(
"tDeserializeSSubQueryMsg failed, contLen:%d"
,
pMsg
->
contLen
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
uint64_t
sId
=
msg
->
sId
;
uint64_t
qId
=
msg
->
queryId
;
uint64_t
tId
=
msg
->
taskId
;
int64_t
rId
=
msg
->
refId
;
int32_t
eId
=
msg
->
execId
;
uint64_t
sId
=
msg
.
sId
;
uint64_t
qId
=
msg
.
queryId
;
uint64_t
tId
=
msg
.
taskId
;
int64_t
rId
=
msg
.
refId
;
int32_t
eId
=
msg
.
execId
;
SQWMsg
qwMsg
=
{.
node
=
node
,
.
msg
=
msg
->
msg
+
msg
->
sqlLen
,
.
msgLen
=
msg
->
phy
Len
,
.
msg
=
msg
.
msg
,
.
msgLen
=
msg
.
msg
Len
,
.
connInfo
=
pMsg
->
info
,
.
msgType
=
pMsg
->
msgType
};
qwMsg
.
msgInfo
.
explain
=
msg
->
explain
;
qwMsg
.
msgInfo
.
taskType
=
msg
->
taskType
;
qwMsg
.
msgInfo
.
needFetch
=
msg
->
needFetch
;
qwMsg
.
msgInfo
.
explain
=
msg
.
explain
;
qwMsg
.
msgInfo
.
taskType
=
msg
.
taskType
;
qwMsg
.
msgInfo
.
needFetch
=
msg
.
needFetch
;
char
*
sql
=
strndup
(
msg
->
msg
,
msg
->
sqlLen
);
QW_SCH_TASK_DLOG
(
"processQuery start, node:%p, type:%s, handle:%p, SQL:%s"
,
node
,
TMSG_INFO
(
pMsg
->
msgType
),
pMsg
->
info
.
handle
,
sql
);
QW_ERR_JRET
(
qwProcessQuery
(
QW_FPARAMS
(),
&
qwMsg
,
sql
));
pMsg
->
info
.
handle
,
msg
.
sql
);
code
=
qwProcessQuery
(
QW_FPARAMS
(),
&
qwMsg
,
msg
.
sql
);
msg
.
sql
=
NULL
;
QW_SCH_TASK_DLOG
(
"processQuery end, node:%p, code:%x"
,
node
,
code
);
_return:
tFreeSSubQueryMsg
(
&
msg
);
QW_SCH_TASK_DLOG
(
"processQuery end, node:%p, code:%d"
,
node
,
code
);
return
code
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
qWorkerProcessCQueryMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
,
int64_t
ts
)
{
...
...
@@ -548,28 +573,22 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int6
}
int32_t
code
=
0
;
STaskDropReq
*
msg
=
pMsg
->
pCont
;
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
qwUpdateTimeInQueue
(
mgmt
,
ts
,
FETCH_QUEUE
);
QW_STAT_INC
(
mgmt
->
stat
.
msgStat
.
dropProcessed
,
1
);
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
QW_ELOG
(
"invalid task drop msg, msg:%p, msgLen:%d"
,
msg
,
pMsg
->
contLen
);
STaskDropReq
msg
=
{
0
};
if
(
tDeserializeSTaskDropReq
(
pMsg
->
pCont
,
pMsg
->
contLen
,
&
msg
)
<
0
)
{
QW_ELOG
(
"tDeserializeSTaskDropReq failed, contLen:%d"
,
pMsg
->
contLen
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
msg
->
sId
=
be64toh
(
msg
->
sId
);
msg
->
queryId
=
be64toh
(
msg
->
queryId
);
msg
->
taskId
=
be64toh
(
msg
->
taskId
);
msg
->
refId
=
be64toh
(
msg
->
refId
);
msg
->
execId
=
ntohl
(
msg
->
execId
);
uint64_t
sId
=
msg
->
sId
;
uint64_t
qId
=
msg
->
queryId
;
uint64_t
tId
=
msg
->
taskId
;
int64_t
rId
=
msg
->
refId
;
int32_t
eId
=
msg
->
execId
;
uint64_t
sId
=
msg
.
sId
;
uint64_t
qId
=
msg
.
queryId
;
uint64_t
tId
=
msg
.
taskId
;
int64_t
rId
=
msg
.
refId
;
int32_t
eId
=
msg
.
execId
;
SQWMsg
qwMsg
=
{.
node
=
node
,
.
msg
=
NULL
,
.
msgLen
=
0
,
.
code
=
pMsg
->
code
,
.
connInfo
=
pMsg
->
info
};
...
...
source/libs/qworker/test/qworkerTests.cpp
浏览文件 @
1b0c9843
...
...
@@ -114,7 +114,7 @@ void qwtBuildQueryReqMsg(SRpcMsg *queryRpc) {
qwtqueryMsg
.
queryId
=
htobe64
(
atomic_add_fetch_64
(
&
qwtTestQueryId
,
1
));
qwtqueryMsg
.
sId
=
htobe64
(
1
);
qwtqueryMsg
.
taskId
=
htobe64
(
1
);
qwtqueryMsg
.
phy
Len
=
htonl
(
100
);
qwtqueryMsg
.
msg
Len
=
htonl
(
100
);
qwtqueryMsg
.
sqlLen
=
0
;
queryRpc
->
msgType
=
TDMT_SCH_QUERY
;
queryRpc
->
pCont
=
&
qwtqueryMsg
;
...
...
@@ -131,12 +131,29 @@ void qwtBuildFetchReqMsg(SResFetchReq *fetchMsg, SRpcMsg *fetchRpc) {
}
void
qwtBuildDropReqMsg
(
STaskDropReq
*
dropMsg
,
SRpcMsg
*
dropRpc
)
{
dropMsg
->
sId
=
htobe64
(
1
);
dropMsg
->
queryId
=
htobe64
(
atomic_load_64
(
&
qwtTestQueryId
));
dropMsg
->
taskId
=
htobe64
(
1
);
dropMsg
->
sId
=
1
;
dropMsg
->
queryId
=
atomic_load_64
(
&
qwtTestQueryId
);
dropMsg
->
taskId
=
1
;
int32_t
msgSize
=
tSerializeSTaskDropReq
(
NULL
,
0
,
dropMsg
);
if
(
msgSize
<
0
)
{
return
;
}
char
*
msg
=
taosMemoryCalloc
(
1
,
msgSize
);
if
(
NULL
==
msg
)
{
return
;
}
if
(
tSerializeSTaskDropReq
(
msg
,
msgSize
,
dropMsg
)
<
0
)
{
taosMemoryFree
(
msg
);
return
;
}
dropRpc
->
msgType
=
TDMT_SCH_DROP_TASK
;
dropRpc
->
pCont
=
dropM
sg
;
dropRpc
->
contLen
=
sizeof
(
STaskDropReq
)
;
dropRpc
->
pCont
=
m
sg
;
dropRpc
->
contLen
=
msgSize
;
}
int32_t
qwtStringToPlan
(
const
char
*
str
,
SSubplan
**
subplan
)
{
...
...
source/libs/scheduler/src/schRemote.c
浏览文件 @
1b0c9843
...
...
@@ -1042,30 +1042,40 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
case
TDMT_SCH_MERGE_QUERY
:
{
SCH_ERR_RET
(
schMakeQueryRpcCtx
(
pJob
,
pTask
,
&
rpcCtx
));
uint32_t
len
=
strlen
(
pJob
->
sql
);
msgSize
=
sizeof
(
SSubQueryMsg
)
+
pTask
->
msgLen
+
len
;
SSubQueryMsg
qMsg
;
qMsg
.
header
.
vgId
=
addr
->
nodeId
;
qMsg
.
header
.
contLen
=
0
;
qMsg
.
sId
=
schMgmt
.
sId
;
qMsg
.
queryId
=
pJob
->
queryId
;
qMsg
.
taskId
=
pTask
->
taskId
;
qMsg
.
refId
=
pJob
->
refId
;
qMsg
.
execId
=
pTask
->
execId
;
qMsg
.
msgMask
=
(
pTask
->
plan
->
showRewrite
)
?
QUERY_MSG_MASK_SHOW_REWRITE
()
:
0
;
qMsg
.
taskType
=
TASK_TYPE_TEMP
;
qMsg
.
explain
=
SCH_IS_EXPLAIN_JOB
(
pJob
);
qMsg
.
needFetch
=
SCH_TASK_NEED_FETCH
(
pTask
);
qMsg
.
sqlLen
=
strlen
(
pJob
->
sql
);
qMsg
.
sql
=
pJob
->
sql
;
qMsg
.
msgLen
=
pTask
->
msgLen
;
qMsg
.
msg
=
pTask
->
msg
;
msgSize
=
tSerializeSSubQueryMsg
(
NULL
,
0
,
&
qMsg
);
if
(
msgSize
<
0
)
{
SCH_TASK_ELOG
(
"tSerializeSSubQueryMsg get size, msgSize:%d"
,
msgSize
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
msg
=
taosMemoryCalloc
(
1
,
msgSize
);
if
(
NULL
==
msg
)
{
SCH_TASK_ELOG
(
"calloc %d failed"
,
msgSize
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
SSubQueryMsg
*
pMsg
=
msg
;
pMsg
->
header
.
vgId
=
htonl
(
addr
->
nodeId
);
pMsg
->
sId
=
htobe64
(
schMgmt
.
sId
);
pMsg
->
queryId
=
htobe64
(
pJob
->
queryId
);
pMsg
->
taskId
=
htobe64
(
pTask
->
taskId
);
pMsg
->
refId
=
htobe64
(
pJob
->
refId
);
pMsg
->
execId
=
htonl
(
pTask
->
execId
);
pMsg
->
taskType
=
TASK_TYPE_TEMP
;
pMsg
->
explain
=
SCH_IS_EXPLAIN_JOB
(
pJob
);
pMsg
->
needFetch
=
SCH_TASK_NEED_FETCH
(
pTask
);
pMsg
->
phyLen
=
htonl
(
pTask
->
msgLen
);
pMsg
->
sqlLen
=
htonl
(
len
);
pMsg
->
msgMask
=
htonl
((
pTask
->
plan
->
showRewrite
)
?
QUERY_MSG_MASK_SHOW_REWRITE
()
:
0
);
memcpy
(
pMsg
->
msg
,
pJob
->
sql
,
len
);
memcpy
(
pMsg
->
msg
+
len
,
pTask
->
msg
,
pTask
->
msgLen
);
if
(
tSerializeSSubQueryMsg
(
msg
,
msgSize
,
&
qMsg
)
<
0
)
{
SCH_TASK_ELOG
(
"tSerializeSSubQueryMsg failed, msgSize:%d"
,
msgSize
);
taosMemoryFree
(
msg
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
persistHandle
=
true
;
SCH_SET_TASK_HANDLE
(
pTask
,
rpcAllocHandle
());
...
...
@@ -1092,22 +1102,32 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
break
;
}
case
TDMT_SCH_DROP_TASK
:
{
msgSize
=
sizeof
(
STaskDropReq
);
STaskDropReq
qMsg
;
qMsg
.
header
.
vgId
=
addr
->
nodeId
;
qMsg
.
header
.
contLen
=
0
;
qMsg
.
sId
=
schMgmt
.
sId
;
qMsg
.
queryId
=
pJob
->
queryId
;
qMsg
.
taskId
=
pTask
->
taskId
;
qMsg
.
refId
=
pJob
->
refId
;
qMsg
.
execId
=
pTask
->
execId
;
msgSize
=
tSerializeSTaskDropReq
(
NULL
,
0
,
&
qMsg
);
if
(
msgSize
<
0
)
{
SCH_TASK_ELOG
(
"tSerializeSTaskDropReq get size, msgSize:%d"
,
msgSize
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
msg
=
taosMemoryCalloc
(
1
,
msgSize
);
if
(
NULL
==
msg
)
{
SCH_TASK_ELOG
(
"calloc %d failed"
,
msgSize
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
STaskDropReq
*
pMsg
=
msg
;
pMsg
->
header
.
vgId
=
htonl
(
addr
->
nodeId
);
pMsg
->
sId
=
htobe64
(
schMgmt
.
sId
);
pMsg
->
queryId
=
htobe64
(
pJob
->
queryId
);
pMsg
->
taskId
=
htobe64
(
pTask
->
taskId
);
pMsg
->
refId
=
htobe64
(
pJob
->
refId
);
pMsg
->
execId
=
htonl
(
pTask
->
execId
);
if
(
tSerializeSTaskDropReq
(
msg
,
msgSize
,
&
qMsg
)
<
0
)
{
SCH_TASK_ELOG
(
"tSerializeSTaskDropReq failed, msgSize:%d"
,
msgSize
);
taosMemoryFree
(
msg
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
break
;
}
case
TDMT_SCH_QUERY_HEARTBEAT
:
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录