Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
8a033aca
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
8a033aca
编写于
10月 13, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
more code format
上级
c4b3da50
变更
11
展开全部
隐藏空白更改
内联
并排
Showing
11 changed file
with
478 addition
and
504 deletion
+478
-504
source/libs/qcom/inc/queryInt.h
source/libs/qcom/inc/queryInt.h
+0
-1
source/libs/qcom/src/querymsg.c
source/libs/qcom/src/querymsg.c
+58
-59
source/libs/qcom/test/queryTest.cpp
source/libs/qcom/test/queryTest.cpp
+6
-5
source/libs/qworker/inc/qwInt.h
source/libs/qworker/inc/qwInt.h
+63
-55
source/libs/qworker/inc/qwMsg.h
source/libs/qworker/inc/qwMsg.h
+5
-4
source/libs/qworker/src/qwDbg.c
source/libs/qworker/src/qwDbg.c
+23
-23
source/libs/qworker/src/qwMsg.c
source/libs/qworker/src/qwMsg.c
+37
-33
source/libs/qworker/src/qwUtil.c
source/libs/qworker/src/qwUtil.c
+12
-15
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+26
-26
source/libs/qworker/test/qworkerTests.cpp
source/libs/qworker/test/qworkerTests.cpp
+246
-281
tools/scripts/codeFormat.sh
tools/scripts/codeFormat.sh
+2
-2
未找到文件。
source/libs/qcom/inc/queryInt.h
浏览文件 @
8a033aca
...
...
@@ -20,7 +20,6 @@
extern
"C"
{
#endif
#ifdef __cplusplus
}
#endif
...
...
source/libs/qcom/src/querymsg.c
浏览文件 @
8a033aca
...
...
@@ -13,18 +13,19 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tmsg.h"
#include "queryInt.h"
#include "query.h"
#include "
trpc
.h"
#include "
queryInt
.h"
#include "systable.h"
#include "tmsg.h"
#include "trpc.h"
#pragma GCC diagnostic push
#ifdef COMPILER_SUPPORTS_CXX13
#pragma GCC diagnostic ignored "-Wformat-truncation"
#endif
int32_t
(
*
queryBuildMsg
[
TDMT_MAX
])(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
,
void
*
(
*
mallocFp
)(
int32_t
))
=
{
0
};
int32_t
(
*
queryBuildMsg
[
TDMT_MAX
])(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
,
void
*
(
*
mallocFp
)(
int32_t
))
=
{
0
};
int32_t
(
*
queryProcessMsgRsp
[
TDMT_MAX
])(
void
*
output
,
char
*
msg
,
int32_t
msgSize
)
=
{
0
};
int32_t
queryBuildUseDbOutput
(
SUseDbOutput
*
pOut
,
SUseDbRsp
*
usedbRsp
)
{
...
...
@@ -46,7 +47,7 @@ int32_t queryBuildUseDbOutput(SUseDbOutput *pOut, SUseDbRsp *usedbRsp) {
if
(
usedbRsp
->
vgNum
<=
0
)
{
return
TSDB_CODE_SUCCESS
;
}
pOut
->
dbVgroup
->
vgHash
=
taosHashInit
(
usedbRsp
->
vgNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
pOut
->
dbVgroup
->
vgHash
)
{
...
...
@@ -57,7 +58,7 @@ int32_t queryBuildUseDbOutput(SUseDbOutput *pOut, SUseDbRsp *usedbRsp) {
SVgroupInfo
*
pVgInfo
=
taosArrayGet
(
usedbRsp
->
pVgroupInfos
,
i
);
pOut
->
dbVgroup
->
numOfTable
+=
pVgInfo
->
numOfTable
;
qDebug
(
"the %dth vgroup, id %d, epNum %d, current %s port %d"
,
i
,
pVgInfo
->
vgId
,
pVgInfo
->
epSet
.
numOfEps
,
pVgInfo
->
epSet
.
eps
[
pVgInfo
->
epSet
.
inUse
].
fqdn
,
pVgInfo
->
epSet
.
eps
[
pVgInfo
->
epSet
.
inUse
].
port
);
pVgInfo
->
epSet
.
eps
[
pVgInfo
->
epSet
.
inUse
].
fqdn
,
pVgInfo
->
epSet
.
eps
[
pVgInfo
->
epSet
.
inUse
].
port
);
if
(
0
!=
taosHashPut
(
pOut
->
dbVgroup
->
vgHash
,
&
pVgInfo
->
vgId
,
sizeof
(
int32_t
),
pVgInfo
,
sizeof
(
SVgroupInfo
)))
{
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
...
...
@@ -66,7 +67,8 @@ int32_t queryBuildUseDbOutput(SUseDbOutput *pOut, SUseDbRsp *usedbRsp) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
queryBuildTableMetaReqMsg
(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
,
void
*
(
*
mallcFp
)(
int32_t
))
{
int32_t
queryBuildTableMetaReqMsg
(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
,
void
*
(
*
mallcFp
)(
int32_t
))
{
SBuildTableInput
*
pInput
=
input
;
if
(
NULL
==
input
||
NULL
==
msg
||
NULL
==
msgLen
)
{
return
TSDB_CODE_TSC_INVALID_INPUT
;
...
...
@@ -89,7 +91,7 @@ int32_t queryBuildTableMetaReqMsg(void *input, char **msg, int32_t msgSize, int3
return
TSDB_CODE_SUCCESS
;
}
int32_t
queryBuildUseDbMsg
(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
,
void
*
(
*
mallcFp
)(
int32_t
))
{
int32_t
queryBuildUseDbMsg
(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
,
void
*
(
*
mallcFp
)(
int32_t
))
{
SBuildUseDBInput
*
pInput
=
input
;
if
(
NULL
==
pInput
||
NULL
==
msg
||
NULL
==
msgLen
)
{
return
TSDB_CODE_TSC_INVALID_INPUT
;
...
...
@@ -112,7 +114,7 @@ int32_t queryBuildUseDbMsg(void *input, char **msg, int32_t msgSize, int32_t *ms
return
TSDB_CODE_SUCCESS
;
}
int32_t
queryBuildQnodeListMsg
(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
,
void
*
(
*
mallcFp
)(
int32_t
))
{
int32_t
queryBuildQnodeListMsg
(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
,
void
*
(
*
mallcFp
)(
int32_t
))
{
if
(
NULL
==
msg
||
NULL
==
msgLen
)
{
return
TSDB_CODE_TSC_INVALID_INPUT
;
}
...
...
@@ -130,7 +132,7 @@ int32_t queryBuildQnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t
return
TSDB_CODE_SUCCESS
;
}
int32_t
queryBuildDnodeListMsg
(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
,
void
*
(
*
mallcFp
)(
int32_t
))
{
int32_t
queryBuildDnodeListMsg
(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
,
void
*
(
*
mallcFp
)(
int32_t
))
{
if
(
NULL
==
msg
||
NULL
==
msgLen
)
{
return
TSDB_CODE_TSC_INVALID_INPUT
;
}
...
...
@@ -148,7 +150,7 @@ int32_t queryBuildDnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t
return
TSDB_CODE_SUCCESS
;
}
int32_t
queryBuildGetSerVerMsg
(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
,
void
*
(
*
mallcFp
)(
int32_t
))
{
int32_t
queryBuildGetSerVerMsg
(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
,
void
*
(
*
mallcFp
)(
int32_t
))
{
if
(
NULL
==
msg
||
NULL
==
msgLen
)
{
return
TSDB_CODE_TSC_INVALID_INPUT
;
}
...
...
@@ -165,8 +167,7 @@ int32_t queryBuildGetSerVerMsg(void *input, char **msg, int32_t msgSize, int32_t
return
TSDB_CODE_SUCCESS
;
}
int32_t
queryBuildGetDBCfgMsg
(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
,
void
*
(
*
mallcFp
)(
int32_t
))
{
int32_t
queryBuildGetDBCfgMsg
(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
,
void
*
(
*
mallcFp
)(
int32_t
))
{
if
(
NULL
==
msg
||
NULL
==
msgLen
)
{
return
TSDB_CODE_TSC_INVALID_INPUT
;
}
...
...
@@ -184,7 +185,7 @@ int32_t queryBuildGetDBCfgMsg(void *input, char **msg, int32_t msgSize, int32_t
return
TSDB_CODE_SUCCESS
;
}
int32_t
queryBuildGetIndexMsg
(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
,
void
*
(
*
mallcFp
)(
int32_t
))
{
int32_t
queryBuildGetIndexMsg
(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
,
void
*
(
*
mallcFp
)(
int32_t
))
{
if
(
NULL
==
msg
||
NULL
==
msgLen
)
{
return
TSDB_CODE_TSC_INVALID_INPUT
;
}
...
...
@@ -202,7 +203,8 @@ int32_t queryBuildGetIndexMsg(void *input, char **msg, int32_t msgSize, int32_t
return
TSDB_CODE_SUCCESS
;
}
int32_t
queryBuildRetrieveFuncMsg
(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
,
void
*
(
*
mallcFp
)(
int32_t
))
{
int32_t
queryBuildRetrieveFuncMsg
(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
,
void
*
(
*
mallcFp
)(
int32_t
))
{
if
(
NULL
==
msg
||
NULL
==
msgLen
)
{
return
TSDB_CODE_TSC_INVALID_INPUT
;
}
...
...
@@ -225,7 +227,7 @@ int32_t queryBuildRetrieveFuncMsg(void *input, char **msg, int32_t msgSize, int3
return
TSDB_CODE_SUCCESS
;
}
int32_t
queryBuildGetUserAuthMsg
(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
,
void
*
(
*
mallcFp
)(
int32_t
))
{
int32_t
queryBuildGetUserAuthMsg
(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
,
void
*
(
*
mallcFp
)(
int32_t
))
{
if
(
NULL
==
msg
||
NULL
==
msgLen
)
{
return
TSDB_CODE_TSC_INVALID_INPUT
;
}
...
...
@@ -243,7 +245,7 @@ int32_t queryBuildGetUserAuthMsg(void *input, char **msg, int32_t msgSize, int32
return
TSDB_CODE_SUCCESS
;
}
int32_t
queryBuildGetTbIndexMsg
(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
,
void
*
(
*
mallcFp
)(
int32_t
))
{
int32_t
queryBuildGetTbIndexMsg
(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
,
void
*
(
*
mallcFp
)(
int32_t
))
{
if
(
NULL
==
msg
||
NULL
==
msgLen
)
{
return
TSDB_CODE_TSC_INVALID_INPUT
;
}
...
...
@@ -261,13 +263,13 @@ int32_t queryBuildGetTbIndexMsg(void *input, char **msg, int32_t msgSize, int32_
return
TSDB_CODE_SUCCESS
;
}
int32_t
queryBuildGetTbCfgMsg
(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
,
void
*
(
*
mallcFp
)(
int32_t
))
{
int32_t
queryBuildGetTbCfgMsg
(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
,
void
*
(
*
mallcFp
)(
int32_t
))
{
if
(
NULL
==
msg
||
NULL
==
msgLen
)
{
return
TSDB_CODE_TSC_INVALID_INPUT
;
}
SBuildTableInput
*
pInput
=
input
;
STableCfgReq
cfgReq
=
{
0
};
STableCfgReq
cfgReq
=
{
0
};
cfgReq
.
header
.
vgId
=
pInput
->
vgId
;
strcpy
(
cfgReq
.
dbFName
,
pInput
->
dbFName
);
strcpy
(
cfgReq
.
tbName
,
pInput
->
tbName
);
...
...
@@ -282,7 +284,6 @@ int32_t queryBuildGetTbCfgMsg(void *input, char **msg, int32_t msgSize, int32_t
return
TSDB_CODE_SUCCESS
;
}
int32_t
queryProcessUseDBRsp
(
void
*
output
,
char
*
msg
,
int32_t
msgSize
)
{
SUseDbOutput
*
pOut
=
output
;
SUseDbRsp
usedbRsp
=
{
0
};
...
...
@@ -362,13 +363,12 @@ int32_t queryCreateCTableMetaFromMsg(STableMetaRsp *msg, SCTableMeta *pMeta) {
pMeta
->
uid
=
msg
->
tuid
;
pMeta
->
suid
=
msg
->
suid
;
qDebug
(
"ctable %s uid %"
PRIx64
" meta returned, type %d vgId:%d db %s suid %"
PRIx64
,
msg
->
tbName
,
pMeta
->
uid
,
pMeta
->
tableType
,
pMeta
->
vgId
,
msg
->
dbFName
,
pMeta
->
suid
);
qDebug
(
"ctable %s uid %"
PRIx64
" meta returned, type %d vgId:%d db %s suid %"
PRIx64
,
msg
->
tbName
,
pMeta
->
uid
,
pMeta
->
tableType
,
pMeta
->
vgId
,
msg
->
dbFName
,
pMeta
->
suid
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
queryCreateTableMetaFromMsg
(
STableMetaRsp
*
msg
,
bool
isStb
,
STableMeta
**
pMeta
)
{
int32_t
total
=
msg
->
numOfColumns
+
msg
->
numOfTags
;
int32_t
metaSize
=
sizeof
(
STableMeta
)
+
sizeof
(
SSchema
)
*
total
;
...
...
@@ -425,7 +425,8 @@ int32_t queryProcessTableMetaRsp(void *output, char *msg, int32_t msgSize) {
goto
PROCESS_META_OVER
;
}
if
(
0
!=
strcmp
(
metaRsp
.
dbFName
,
TSDB_INFORMATION_SCHEMA_DB
)
&&
!
tIsValidSchema
(
metaRsp
.
pSchemas
,
metaRsp
.
numOfColumns
,
metaRsp
.
numOfTags
))
{
if
(
0
!=
strcmp
(
metaRsp
.
dbFName
,
TSDB_INFORMATION_SCHEMA_DB
)
&&
!
tIsValidSchema
(
metaRsp
.
pSchemas
,
metaRsp
.
numOfColumns
,
metaRsp
.
numOfTags
))
{
code
=
TSDB_CODE_TSC_INVALID_VALUE
;
goto
PROCESS_META_OVER
;
}
...
...
@@ -461,7 +462,6 @@ PROCESS_META_OVER:
return
code
;
}
int32_t
queryProcessQnodeListRsp
(
void
*
output
,
char
*
msg
,
int32_t
msgSize
)
{
SQnodeListRsp
out
=
{
0
};
int32_t
code
=
0
;
...
...
@@ -496,7 +496,7 @@ int32_t queryProcessDnodeListRsp(void *output, char *msg, int32_t msgSize) {
return
code
;
}
*
(
SArray
**
)
output
=
out
.
dnodeList
;
*
(
SArray
**
)
output
=
out
.
dnodeList
;
return
code
;
}
...
...
@@ -516,12 +516,11 @@ int32_t queryProcessGetSerVerRsp(void *output, char *msg, int32_t msgSize) {
return
code
;
}
*
(
char
**
)
output
=
strdup
(
out
.
ver
);
*
(
char
**
)
output
=
strdup
(
out
.
ver
);
return
code
;
}
int32_t
queryProcessGetDbCfgRsp
(
void
*
output
,
char
*
msg
,
int32_t
msgSize
)
{
SDbCfgRsp
out
=
{
0
};
...
...
@@ -573,7 +572,7 @@ int32_t queryProcessRetrieveFuncRsp(void *output, char *msg, int32_t msgSize) {
return
TSDB_CODE_INVALID_MSG
;
}
SFuncInfo
*
funcInfo
=
taosArrayGet
(
out
.
pFuncInfos
,
0
);
SFuncInfo
*
funcInfo
=
taosArrayGet
(
out
.
pFuncInfos
,
0
);
memcpy
(
output
,
funcInfo
,
sizeof
(
*
funcInfo
));
taosArrayDestroy
(
out
.
pFuncInfos
);
...
...
@@ -599,12 +598,12 @@ int32_t queryProcessGetTbIndexRsp(void *output, char *msg, int32_t msgSize) {
return
TSDB_CODE_TSC_INVALID_INPUT
;
}
STableIndexRsp
*
out
=
(
STableIndexRsp
*
)
output
;
STableIndexRsp
*
out
=
(
STableIndexRsp
*
)
output
;
if
(
tDeserializeSTableIndexRsp
(
msg
,
msgSize
,
out
)
!=
0
)
{
qError
(
"tDeserializeSTableIndexRsp failed, msgSize:%d"
,
msgSize
);
return
TSDB_CODE_INVALID_MSG
;
}
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -619,39 +618,39 @@ int32_t queryProcessGetTbCfgRsp(void *output, char *msg, int32_t msgSize) {
return
TSDB_CODE_INVALID_MSG
;
}
*
(
STableCfgRsp
**
)
output
=
out
;
*
(
STableCfgRsp
**
)
output
=
out
;
return
TSDB_CODE_SUCCESS
;
}
void
initQueryModuleMsgHandle
()
{
queryBuildMsg
[
TMSG_INDEX
(
TDMT_VND_TABLE_META
)]
=
queryBuildTableMetaReqMsg
;
queryBuildMsg
[
TMSG_INDEX
(
TDMT_MND_TABLE_META
)]
=
queryBuildTableMetaReqMsg
;
queryBuildMsg
[
TMSG_INDEX
(
TDMT_MND_USE_DB
)]
=
queryBuildUseDbMsg
;
queryBuildMsg
[
TMSG_INDEX
(
TDMT_MND_QNODE_LIST
)]
=
queryBuildQnodeListMsg
;
queryBuildMsg
[
TMSG_INDEX
(
TDMT_MND_DNODE_LIST
)]
=
queryBuildDnodeListMsg
;
queryBuildMsg
[
TMSG_INDEX
(
TDMT_MND_GET_DB_CFG
)]
=
queryBuildGetDBCfgMsg
;
queryBuildMsg
[
TMSG_INDEX
(
TDMT_MND_GET_INDEX
)]
=
queryBuildGetIndexMsg
;
queryBuildMsg
[
TMSG_INDEX
(
TDMT_MND_RETRIEVE_FUNC
)]
=
queryBuildRetrieveFuncMsg
;
queryBuildMsg
[
TMSG_INDEX
(
TDMT_MND_GET_USER_AUTH
)]
=
queryBuildGetUserAuthMsg
;
queryBuildMsg
[
TMSG_INDEX
(
TDMT_MND_GET_TABLE_INDEX
)]
=
queryBuildGetTbIndexMsg
;
queryBuildMsg
[
TMSG_INDEX
(
TDMT_VND_TABLE_CFG
)]
=
queryBuildGetTbCfgMsg
;
queryBuildMsg
[
TMSG_INDEX
(
TDMT_MND_TABLE_CFG
)]
=
queryBuildGetTbCfgMsg
;
queryBuildMsg
[
TMSG_INDEX
(
TDMT_MND_SERVER_VERSION
)]
=
queryBuildGetSerVerMsg
;
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_VND_TABLE_META
)]
=
queryProcessTableMetaRsp
;
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_MND_TABLE_META
)]
=
queryProcessTableMetaRsp
;
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_MND_USE_DB
)]
=
queryProcessUseDBRsp
;
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_MND_QNODE_LIST
)]
=
queryProcessQnodeListRsp
;
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_MND_DNODE_LIST
)]
=
queryProcessDnodeListRsp
;
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_MND_GET_DB_CFG
)]
=
queryProcessGetDbCfgRsp
;
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_MND_GET_INDEX
)]
=
queryProcessGetIndexRsp
;
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_MND_RETRIEVE_FUNC
)]
=
queryProcessRetrieveFuncRsp
;
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_MND_GET_USER_AUTH
)]
=
queryProcessGetUserAuthRsp
;
queryBuildMsg
[
TMSG_INDEX
(
TDMT_VND_TABLE_META
)]
=
queryBuildTableMetaReqMsg
;
queryBuildMsg
[
TMSG_INDEX
(
TDMT_MND_TABLE_META
)]
=
queryBuildTableMetaReqMsg
;
queryBuildMsg
[
TMSG_INDEX
(
TDMT_MND_USE_DB
)]
=
queryBuildUseDbMsg
;
queryBuildMsg
[
TMSG_INDEX
(
TDMT_MND_QNODE_LIST
)]
=
queryBuildQnodeListMsg
;
queryBuildMsg
[
TMSG_INDEX
(
TDMT_MND_DNODE_LIST
)]
=
queryBuildDnodeListMsg
;
queryBuildMsg
[
TMSG_INDEX
(
TDMT_MND_GET_DB_CFG
)]
=
queryBuildGetDBCfgMsg
;
queryBuildMsg
[
TMSG_INDEX
(
TDMT_MND_GET_INDEX
)]
=
queryBuildGetIndexMsg
;
queryBuildMsg
[
TMSG_INDEX
(
TDMT_MND_RETRIEVE_FUNC
)]
=
queryBuildRetrieveFuncMsg
;
queryBuildMsg
[
TMSG_INDEX
(
TDMT_MND_GET_USER_AUTH
)]
=
queryBuildGetUserAuthMsg
;
queryBuildMsg
[
TMSG_INDEX
(
TDMT_MND_GET_TABLE_INDEX
)]
=
queryBuildGetTbIndexMsg
;
queryBuildMsg
[
TMSG_INDEX
(
TDMT_VND_TABLE_CFG
)]
=
queryBuildGetTbCfgMsg
;
queryBuildMsg
[
TMSG_INDEX
(
TDMT_MND_TABLE_CFG
)]
=
queryBuildGetTbCfgMsg
;
queryBuildMsg
[
TMSG_INDEX
(
TDMT_MND_SERVER_VERSION
)]
=
queryBuildGetSerVerMsg
;
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_VND_TABLE_META
)]
=
queryProcessTableMetaRsp
;
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_MND_TABLE_META
)]
=
queryProcessTableMetaRsp
;
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_MND_USE_DB
)]
=
queryProcessUseDBRsp
;
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_MND_QNODE_LIST
)]
=
queryProcessQnodeListRsp
;
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_MND_DNODE_LIST
)]
=
queryProcessDnodeListRsp
;
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_MND_GET_DB_CFG
)]
=
queryProcessGetDbCfgRsp
;
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_MND_GET_INDEX
)]
=
queryProcessGetIndexRsp
;
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_MND_RETRIEVE_FUNC
)]
=
queryProcessRetrieveFuncRsp
;
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_MND_GET_USER_AUTH
)]
=
queryProcessGetUserAuthRsp
;
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_MND_GET_TABLE_INDEX
)]
=
queryProcessGetTbIndexRsp
;
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_VND_TABLE_CFG
)]
=
queryProcessGetTbCfgRsp
;
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_MND_TABLE_CFG
)]
=
queryProcessGetTbCfgRsp
;
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_MND_SERVER_VERSION
)]
=
queryProcessGetSerVerRsp
;
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_VND_TABLE_CFG
)]
=
queryProcessGetTbCfgRsp
;
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_MND_TABLE_CFG
)]
=
queryProcessGetTbCfgRsp
;
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_MND_SERVER_VERSION
)]
=
queryProcessGetSerVerRsp
;
}
#pragma GCC diagnostic pop
source/libs/qcom/test/queryTest.cpp
浏览文件 @
8a033aca
...
...
@@ -15,8 +15,9 @@
#include <gtest/gtest.h>
#include <iostream>
#include "tmsg.h"
#include "query.h"
#include "tmsg.h"
#include "trpc.h"
#pragma GCC diagnostic push
...
...
@@ -37,7 +38,7 @@ int32_t testPrint(void* p) {
}
int32_t
testPrintError
(
void
*
p
)
{
SParam
*
param
=
(
SParam
*
)
p
;
SParam
*
param
=
(
SParam
*
)
p
;
taosMemoryFreeClear
(
p
);
return
-
1
;
...
...
@@ -67,8 +68,8 @@ TEST(testCase, async_task_test) {
}
TEST
(
testCase
,
many_async_task_test
)
{
for
(
int32_t
i
=
0
;
i
<
50
;
++
i
)
{
SParam
*
p
=
(
SParam
*
)
taosMemoryCalloc
(
1
,
sizeof
(
SParam
));
for
(
int32_t
i
=
0
;
i
<
50
;
++
i
)
{
SParam
*
p
=
(
SParam
*
)
taosMemoryCalloc
(
1
,
sizeof
(
SParam
));
p
->
v
=
i
;
taosAsyncExec
(
testPrint
,
p
,
NULL
);
}
...
...
@@ -78,7 +79,7 @@ TEST(testCase, many_async_task_test) {
TEST
(
testCase
,
error_in_async_test
)
{
int32_t
code
=
0
;
SParam
*
p
=
(
SParam
*
)
taosMemoryCalloc
(
1
,
sizeof
(
SParam
));
SParam
*
p
=
(
SParam
*
)
taosMemoryCalloc
(
1
,
sizeof
(
SParam
));
taosAsyncExec
(
testPrintError
,
p
,
&
code
);
taosMsleep
(
1
);
printf
(
"Error code:%d after asynchronously exec function
\n
"
,
code
);
...
...
source/libs/qworker/inc/qwInt.h
浏览文件 @
8a033aca
...
...
@@ -20,22 +20,22 @@
extern
"C"
{
#endif
#include "executor.h"
#include "osDef.h"
#include "plannodes.h"
#include "qworker.h"
#include "tlockfree.h"
#include "ttimer.h"
#include "tref.h"
#include "plannodes.h"
#include "executor.h"
#include "trpc.h"
#include "ttimer.h"
#define QW_DEFAULT_SCHEDULER_NUMBER 100
#define QW_DEFAULT_TASK_NUMBER 10000
#define QW_DEFAULT_SCH_TASK_NUMBER 10000
#define QW_DEFAULT_SHORT_RUN_TIMES 2
#define QW_DEFAULT_HEARTBEAT_MSEC 5000
#define QW_SCH_TIMEOUT_MSEC 180000
#define QW_MIN_RES_ROWS 4096
#define QW_SCH_TIMEOUT_MSEC
180000
#define QW_MIN_RES_ROWS
4096
enum
{
QW_PHASE_PRE_QUERY
=
1
,
...
...
@@ -128,16 +128,16 @@ typedef struct SQWTaskCtx {
bool
queryContinue
;
bool
queryInQueue
;
int32_t
rspCode
;
int64_t
affectedRows
;
// for insert ...select stmt
int64_t
affectedRows
;
// for insert ...select stmt
SRpcHandleInfo
ctrlConnInfo
;
SRpcHandleInfo
dataConnInfo
;
int8_t
events
[
QW_EVENT_MAX
];
SArray
*
explainRes
;
void
*
taskHandle
;
void
*
sinkHandle
;
SArray
*
explainRes
;
void
*
taskHandle
;
void
*
sinkHandle
;
STbVerInfo
tbInfo
;
}
SQWTaskCtx
;
...
...
@@ -157,14 +157,14 @@ typedef struct SQWTimeInQ {
typedef
struct
SQWMsgStat
{
SQWTimeInQ
waitTime
[
2
];
uint64_t
queryProcessed
;
uint64_t
cqueryProcessed
;
uint64_t
fetchProcessed
;
uint64_t
rspProcessed
;
uint64_t
cancelProcessed
;
uint64_t
dropProcessed
;
uint64_t
hbProcessed
;
uint64_t
deleteProcessed
;
uint64_t
queryProcessed
;
uint64_t
cqueryProcessed
;
uint64_t
fetchProcessed
;
uint64_t
rspProcessed
;
uint64_t
cancelProcessed
;
uint64_t
dropProcessed
;
uint64_t
hbProcessed
;
uint64_t
deleteProcessed
;
}
SQWMsgStat
;
typedef
struct
SQWRTStat
{
...
...
@@ -173,8 +173,8 @@ typedef struct SQWRTStat {
}
SQWRTStat
;
typedef
struct
SQWStat
{
SQWMsgStat
msgStat
;
SQWRTStat
rtStat
;
SQWMsgStat
msgStat
;
SQWRTStat
rtStat
;
}
SQWStat
;
// Qnode/Vnode level task management
...
...
@@ -183,15 +183,15 @@ typedef struct SQWorker {
SQWorkerCfg
cfg
;
int8_t
nodeType
;
int32_t
nodeId
;
void
*
timer
;
void
*
timer
;
tmr_h
hbTimer
;
SRWLatch
schLock
;
// SRWLatch ctxLock;
SHashObj
*
schHash
;
// key: schedulerId, value: SQWSchStatus
SHashObj
*
ctxHash
;
// key: queryId+taskId, value: SQWTaskCtx
SMsgCb
msgCb
;
SQWStat
stat
;
int32_t
*
destroyed
;
SHashObj
*
schHash
;
// key: schedulerId, value: SQWSchStatus
SHashObj
*
ctxHash
;
// key: queryId+taskId, value: SQWTaskCtx
SMsgCb
msgCb
;
SQWStat
stat
;
int32_t
*
destroyed
;
}
SQWorker
;
typedef
struct
SQWorkerMgmt
{
...
...
@@ -208,16 +208,21 @@ typedef struct SQWorkerMgmt {
#define QW_STAT_INC(_item, _n) atomic_add_fetch_64(&(_item), _n)
#define QW_STAT_DEC(_item, _n) atomic_sub_fetch_64(&(_item), _n)
#define QW_STAT_GET(_item) atomic_load_64(&(_item))
#define QW_STAT_GET(_item)
atomic_load_64(&(_item))
#define QW_GET_EVENT(ctx, event) atomic_load_8(&(ctx)->events[event])
#define QW_EVENT_RECEIVED(ctx, event) (QW_GET_EVENT(ctx, event) == QW_EVENT_RECEIVED)
#define QW_EVENT_PROCESSED(ctx, event) (QW_GET_EVENT(ctx, event) == QW_EVENT_PROCESSED)
#define QW_GET_EVENT(ctx, event)
atomic_load_8(&(ctx)->events[event])
#define QW_EVENT_RECEIVED(ctx, event)
(QW_GET_EVENT(ctx, event) == QW_EVENT_RECEIVED)
#define QW_EVENT_PROCESSED(ctx, event)
(QW_GET_EVENT(ctx, event) == QW_EVENT_PROCESSED)
#define QW_SET_EVENT_RECEIVED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_RECEIVED)
#define QW_SET_EVENT_PROCESSED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_PROCESSED)
#define QW_GET_PHASE(ctx) atomic_load_8(&(ctx)->phase)
#define QW_SET_PHASE(ctx, _value) do { if ((_value) != QW_PHASE_PRE_FETCH && (_value) != QW_PHASE_POST_FETCH) { atomic_store_8(&(ctx)->phase, _value); } } while (0)
#define QW_SET_PHASE(ctx, _value) \
do { \
if ((_value) != QW_PHASE_PRE_FETCH && (_value) != QW_PHASE_POST_FETCH) { \
atomic_store_8(&(ctx)->phase, _value); \
} \
} while (0)
#define QW_SET_RSP_CODE(ctx, code) atomic_store_32(&(ctx)->rspCode, code)
#define QW_UPDATE_RSP_CODE(ctx, code) atomic_val_compare_exchange_32(&(ctx)->rspCode, 0, code)
...
...
@@ -230,7 +235,7 @@ typedef struct SQWorkerMgmt {
*(uint64_t *)((char *)(id) + sizeof(qId)) = (tId); \
*(int32_t *)((char *)(id) + sizeof(qId) + sizeof(tId)) = (eId); \
} while (0)
#define QW_GET_QTID(id, qId, tId, eId) \
do { \
(qId) = *(uint64_t *)(id); \
...
...
@@ -268,10 +273,10 @@ typedef struct SQWorkerMgmt {
#define QW_TLOG(_param, ...) qTrace("QW:%p " _param, mgmt, __VA_ARGS__)
#define QW_DUMP(_param, ...) \
do { \
if (gQWDebug.dumpEnable) { \
do {
\
if (gQWDebug.dumpEnable) {
\
qDebug("QW:%p " _param, mgmt, __VA_ARGS__); \
} \
}
\
} while (0)
#define QW_SCH_ELOG(param, ...) qError("QW:%p SID:%" PRIx64 " " param, mgmt, sId, __VA_ARGS__)
...
...
@@ -287,12 +292,15 @@ typedef struct SQWorkerMgmt {
#define QW_TASK_WLOG_E(param) qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, eId)
#define QW_TASK_DLOG_E(param) qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, eId)
#define QW_SCH_TASK_ELOG(param, ...) \
qError("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, qId, tId, eId, __VA_ARGS__)
#define QW_SCH_TASK_WLOG(param, ...) \
qWarn("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, qId, tId, eId, __VA_ARGS__)
#define QW_SCH_TASK_DLOG(param, ...) \
qDebug("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, qId, tId, eId, __VA_ARGS__)
#define QW_SCH_TASK_ELOG(param, ...) \
qError("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, qId, tId, eId, \
__VA_ARGS__)
#define QW_SCH_TASK_WLOG(param, ...) \
qWarn("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, qId, tId, eId, \
__VA_ARGS__)
#define QW_SCH_TASK_DLOG(param, ...) \
qDebug("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, qId, tId, eId, \
__VA_ARGS__)
#define QW_LOCK_DEBUG(...) \
do { \
...
...
@@ -337,41 +345,41 @@ typedef struct SQWorkerMgmt {
} \
} while (0)
extern
SQWorkerMgmt
gQwMgmt
;
static
FORCE_INLINE
SQWorker
*
qwAcquire
(
int64_t
refId
)
{
return
(
SQWorker
*
)
taosAcquireRef
(
atomic_load_32
(
&
gQwMgmt
.
qwRef
),
refId
);
}
static
FORCE_INLINE
SQWorker
*
qwAcquire
(
int64_t
refId
)
{
return
(
SQWorker
*
)
taosAcquireRef
(
atomic_load_32
(
&
gQwMgmt
.
qwRef
),
refId
);
}
static
FORCE_INLINE
int32_t
qwRelease
(
int64_t
refId
)
{
return
taosReleaseRef
(
gQwMgmt
.
qwRef
,
refId
);
}
char
*
qwPhaseStr
(
int32_t
phase
);
char
*
qwBufStatusStr
(
int32_t
bufStatus
);
char
*
qwPhaseStr
(
int32_t
phase
);
char
*
qwBufStatusStr
(
int32_t
bufStatus
);
int32_t
qwAcquireAddScheduler
(
SQWorker
*
mgmt
,
uint64_t
sId
,
int32_t
rwType
,
SQWSchStatus
**
sch
);
void
qwReleaseScheduler
(
int32_t
rwType
,
SQWorker
*
mgmt
);
void
qwReleaseScheduler
(
int32_t
rwType
,
SQWorker
*
mgmt
);
int32_t
qwAddTaskStatus
(
QW_FPARAMS_DEF
,
int32_t
status
);
int32_t
qwAcquireTaskCtx
(
QW_FPARAMS_DEF
,
SQWTaskCtx
**
ctx
);
int32_t
qwGetTaskCtx
(
QW_FPARAMS_DEF
,
SQWTaskCtx
**
ctx
);
int32_t
qwAddAcquireTaskCtx
(
QW_FPARAMS_DEF
,
SQWTaskCtx
**
ctx
);
void
qwReleaseTaskCtx
(
SQWorker
*
mgmt
,
void
*
ctx
);
void
qwReleaseTaskCtx
(
SQWorker
*
mgmt
,
void
*
ctx
);
int32_t
qwKillTaskHandle
(
SQWTaskCtx
*
ctx
);
int32_t
qwUpdateTaskStatus
(
QW_FPARAMS_DEF
,
int8_t
status
);
int32_t
qwDropTask
(
QW_FPARAMS_DEF
);
void
qwSaveTbVersionInfo
(
qTaskInfo_t
pTaskInfo
,
SQWTaskCtx
*
ctx
);
void
qwSaveTbVersionInfo
(
qTaskInfo_t
pTaskInfo
,
SQWTaskCtx
*
ctx
);
int32_t
qwOpenRef
(
void
);
void
qwSetHbParam
(
int64_t
refId
,
SQWHbParam
**
pParam
);
void
qwSetHbParam
(
int64_t
refId
,
SQWHbParam
**
pParam
);
int32_t
qwUpdateTimeInQueue
(
SQWorker
*
mgmt
,
int64_t
ts
,
EQueueType
type
);
int64_t
qwGetTimeInQueue
(
SQWorker
*
mgmt
,
EQueueType
type
);
void
qwClearExpiredSch
(
SQWorker
*
mgmt
,
SArray
*
pExpiredSch
);
void
qwClearExpiredSch
(
SQWorker
*
mgmt
,
SArray
*
pExpiredSch
);
int32_t
qwAcquireScheduler
(
SQWorker
*
mgmt
,
uint64_t
sId
,
int32_t
rwType
,
SQWSchStatus
**
sch
);
void
qwFreeTaskCtx
(
SQWTaskCtx
*
ctx
);
void
qwFreeTaskCtx
(
SQWTaskCtx
*
ctx
);
void
qwDbgDumpMgmtInfo
(
SQWorker
*
mgmt
);
void
qwDbgDumpMgmtInfo
(
SQWorker
*
mgmt
);
int32_t
qwDbgValidateStatus
(
QW_FPARAMS_DEF
,
int8_t
oriStatus
,
int8_t
newStatus
,
bool
*
ignore
);
int32_t
qwDbgBuildAndSendRedirectRsp
(
int32_t
rspType
,
SRpcHandleInfo
*
pConn
,
int32_t
code
,
SEpSet
*
pEpSet
);
int32_t
qwAddTaskCtx
(
QW_FPARAMS_DEF
);
void
qwDbgSimulateRedirect
(
SQWMsg
*
qwMsg
,
SQWTaskCtx
*
ctx
,
bool
*
rsped
);
void
qwDbgSimulateSleep
(
void
);
void
qwDbgSimulateDead
(
QW_FPARAMS_DEF
,
SQWTaskCtx
*
ctx
,
bool
*
rsped
);
void
qwDbgSimulateRedirect
(
SQWMsg
*
qwMsg
,
SQWTaskCtx
*
ctx
,
bool
*
rsped
);
void
qwDbgSimulateSleep
(
void
);
void
qwDbgSimulateDead
(
QW_FPARAMS_DEF
,
SQWTaskCtx
*
ctx
,
bool
*
rsped
);
#ifdef __cplusplus
}
...
...
source/libs/qworker/inc/qwMsg.h
浏览文件 @
8a033aca
...
...
@@ -20,12 +20,12 @@
extern
"C"
{
#endif
#include "qwInt.h"
#include "dataSinkMgt.h"
#include "qwInt.h"
int32_t
qwAbortPrerocessQuery
(
QW_FPARAMS_DEF
);
int32_t
qwPreprocessQuery
(
QW_FPARAMS_DEF
,
SQWMsg
*
qwMsg
);
int32_t
qwProcessQuery
(
QW_FPARAMS_DEF
,
SQWMsg
*
qwMsg
,
char
*
sql
);
int32_t
qwProcessQuery
(
QW_FPARAMS_DEF
,
SQWMsg
*
qwMsg
,
char
*
sql
);
int32_t
qwProcessCQuery
(
QW_FPARAMS_DEF
,
SQWMsg
*
qwMsg
);
int32_t
qwProcessReady
(
QW_FPARAMS_DEF
,
SQWMsg
*
qwMsg
);
int32_t
qwProcessFetch
(
QW_FPARAMS_DEF
,
SQWMsg
*
qwMsg
);
...
...
@@ -35,11 +35,12 @@ int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes);
int32_t
qwBuildAndSendDropRsp
(
SRpcHandleInfo
*
pConn
,
int32_t
code
);
int32_t
qwBuildAndSendCancelRsp
(
SRpcHandleInfo
*
pConn
,
int32_t
code
);
int32_t
qwBuildAndSendFetchRsp
(
int32_t
rspType
,
SRpcHandleInfo
*
pConn
,
SRetrieveTableRsp
*
pRsp
,
int32_t
dataLength
,
int32_t
code
);
int32_t
qwBuildAndSendFetchRsp
(
int32_t
rspType
,
SRpcHandleInfo
*
pConn
,
SRetrieveTableRsp
*
pRsp
,
int32_t
dataLength
,
int32_t
code
);
void
qwBuildFetchRsp
(
void
*
msg
,
SOutputData
*
input
,
int32_t
len
,
bool
qComplete
);
int32_t
qwBuildAndSendCQueryMsg
(
QW_FPARAMS_DEF
,
SRpcHandleInfo
*
pConn
);
int32_t
qwBuildAndSendQueryRsp
(
int32_t
rspType
,
SRpcHandleInfo
*
pConn
,
int32_t
code
,
SQWTaskCtx
*
ctx
);
int32_t
qwBuildAndSendExplainRsp
(
SRpcHandleInfo
*
pConn
,
SArray
*
pExecList
);
int32_t
qwBuildAndSendExplainRsp
(
SRpcHandleInfo
*
pConn
,
SArray
*
pExecList
);
int32_t
qwBuildAndSendErrorRsp
(
int32_t
rspType
,
SRpcHandleInfo
*
pConn
,
int32_t
code
);
void
qwFreeFetchRsp
(
void
*
msg
);
int32_t
qwMallocFetchRsp
(
int8_t
rpcMalloc
,
int32_t
length
,
SRetrieveTableRsp
**
rsp
);
...
...
source/libs/qworker/src/qwDbg.c
浏览文件 @
8a033aca
#include "qworker.h"
#include "dataSinkMgt.h"
#include "executor.h"
#include "planner.h"
#include "query.h"
#include "qwInt.h"
#include "qwMsg.h"
#include "qworker.h"
#include "tcommon.h"
#include "tmsg.h"
#include "tname.h"
SQWDebug
gQWDebug
=
{.
statusEnable
=
true
,
.
dumpEnable
=
false
,
.
redirectSimulate
=
false
,
.
deadSimulate
=
false
,
.
sleepSimulate
=
false
};
SQWDebug
gQWDebug
=
{.
statusEnable
=
true
,
.
dumpEnable
=
false
,
.
redirectSimulate
=
false
,
.
deadSimulate
=
false
,
.
sleepSimulate
=
false
};
int32_t
qwDbgValidateStatus
(
QW_FPARAMS_DEF
,
int8_t
oriStatus
,
int8_t
newStatus
,
bool
*
ignore
)
{
if
(
!
gQWDebug
.
statusEnable
)
{
...
...
@@ -29,15 +33,13 @@ int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus,
switch
(
oriStatus
)
{
case
JOB_TASK_STATUS_NULL
:
if
(
newStatus
!=
JOB_TASK_STATUS_EXEC
&&
newStatus
!=
JOB_TASK_STATUS_FAIL
&&
newStatus
!=
JOB_TASK_STATUS_INIT
)
{
if
(
newStatus
!=
JOB_TASK_STATUS_EXEC
&&
newStatus
!=
JOB_TASK_STATUS_FAIL
&&
newStatus
!=
JOB_TASK_STATUS_INIT
)
{
QW_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
break
;
case
JOB_TASK_STATUS_INIT
:
if
(
newStatus
!=
JOB_TASK_STATUS_DROP
&&
newStatus
!=
JOB_TASK_STATUS_EXEC
&&
newStatus
!=
JOB_TASK_STATUS_FAIL
)
{
if
(
newStatus
!=
JOB_TASK_STATUS_DROP
&&
newStatus
!=
JOB_TASK_STATUS_EXEC
&&
newStatus
!=
JOB_TASK_STATUS_FAIL
)
{
QW_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
...
...
@@ -50,8 +52,8 @@ int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus,
break
;
case
JOB_TASK_STATUS_PART_SUCC
:
if
(
newStatus
!=
JOB_TASK_STATUS_EXEC
&&
newStatus
!=
JOB_TASK_STATUS_SUCC
&&
newStatus
!=
JOB_TASK_STATUS_
FAIL
&&
newStatus
!=
JOB_TASK_STATUS_
DROP
)
{
if
(
newStatus
!=
JOB_TASK_STATUS_EXEC
&&
newStatus
!=
JOB_TASK_STATUS_SUCC
&&
newStatus
!=
JOB_TASK_STATUS_FAIL
&&
newStatus
!=
JOB_TASK_STATUS_DROP
)
{
QW_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
...
...
@@ -89,7 +91,8 @@ _return:
void
qwDbgDumpSchInfo
(
SQWorker
*
mgmt
,
SQWSchStatus
*
sch
,
int32_t
i
)
{
QW_LOCK
(
QW_READ
,
&
sch
->
tasksLock
);
QW_DLOG
(
"the %dth scheduler status, hbBrokenTs:%"
PRId64
",taskNum:%d"
,
i
,
sch
->
hbBrokenTs
,
taosHashGetSize
(
sch
->
tasksHash
));
QW_DLOG
(
"the %dth scheduler status, hbBrokenTs:%"
PRId64
",taskNum:%d"
,
i
,
sch
->
hbBrokenTs
,
taosHashGetSize
(
sch
->
tasksHash
));
QW_UNLOCK
(
QW_READ
,
&
sch
->
tasksLock
);
}
...
...
@@ -120,11 +123,10 @@ void qwDbgDumpMgmtInfo(SQWorker *mgmt) {
QW_DUMP
(
"total remain ctx num %d"
,
taosHashGetSize
(
mgmt
->
ctxHash
));
}
int32_t
qwDbgBuildAndSendRedirectRsp
(
int32_t
rspType
,
SRpcHandleInfo
*
pConn
,
int32_t
code
,
SEpSet
*
pEpSet
)
{
int32_t
contLen
=
0
;
char
*
rsp
=
NULL
;
char
*
rsp
=
NULL
;
if
(
pEpSet
)
{
contLen
=
tSerializeSEpSet
(
NULL
,
0
,
pEpSet
);
rsp
=
rpcMallocCont
(
contLen
);
...
...
@@ -152,12 +154,12 @@ void qwDbgSimulateRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped) {
if
(
*
rsped
)
{
return
;
}
if
(
gQWDebug
.
redirectSimulate
)
{
if
(
++
ignoreTime
<=
10
)
{
return
;
}
if
(
TDMT_SCH_QUERY
==
qwMsg
->
msgType
&&
(
0
==
taosRand
()
%
3
))
{
SEpSet
epSet
=
{
0
};
epSet
.
inUse
=
1
;
...
...
@@ -169,12 +171,12 @@ void qwDbgSimulateRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped) {
strcpy
(
epSet
.
eps
[
2
].
fqdn
,
"localhost"
);
epSet
.
eps
[
2
].
port
=
7300
;
ctx
->
phase
=
QW_PHASE_POST_QUERY
;
ctx
->
phase
=
QW_PHASE_POST_QUERY
;
qwDbgBuildAndSendRedirectRsp
(
qwMsg
->
msgType
+
1
,
&
qwMsg
->
connInfo
,
TSDB_CODE_RPC_REDIRECT
,
&
epSet
);
*
rsped
=
true
;
return
;
}
if
(
TDMT_SCH_MERGE_QUERY
==
qwMsg
->
msgType
&&
(
0
==
taosRand
()
%
3
))
{
QW_SET_PHASE
(
ctx
,
QW_PHASE_POST_QUERY
);
qwDbgBuildAndSendRedirectRsp
(
qwMsg
->
msgType
+
1
,
&
qwMsg
->
connInfo
,
TSDB_CODE_RPC_REDIRECT
,
NULL
);
...
...
@@ -213,18 +215,18 @@ void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *rsped) {
static
int32_t
ignoreTime
=
0
;
if
(
++
ignoreTime
>
10
&&
0
==
taosRand
()
%
9
)
{
SRpcHandleInfo
*
pConn
=
((
ctx
->
msgType
==
TDMT_SCH_FETCH
||
ctx
->
msgType
==
TDMT_SCH_MERGE_FETCH
)
?
&
ctx
->
dataConnInfo
:
&
ctx
->
ctrlConnInfo
);
SRpcHandleInfo
*
pConn
=
((
ctx
->
msgType
==
TDMT_SCH_FETCH
||
ctx
->
msgType
==
TDMT_SCH_MERGE_FETCH
)
?
&
ctx
->
dataConnInfo
:
&
ctx
->
ctrlConnInfo
);
qwBuildAndSendErrorRsp
(
ctx
->
msgType
+
1
,
pConn
,
TSDB_CODE_RPC_BROKEN_LINK
);
qwBuildAndSendDropMsg
(
QW_FPARAMS
(),
pConn
);
*
rsped
=
true
;
return
;
}
}
int32_t
qwDbgEnableDebug
(
char
*
option
)
{
if
(
0
==
strcasecmp
(
option
,
"lock"
))
{
gQWDebug
.
lockEnable
=
true
;
...
...
@@ -263,8 +265,6 @@ int32_t qwDbgEnableDebug(char *option) {
}
qError
(
"invalid qw debug option:%s"
,
option
);
return
TSDB_CODE_APP_ERROR
;
}
source/libs/qworker/src/qwMsg.c
浏览文件 @
8a033aca
...
...
@@ -3,8 +3,8 @@
#include "executor.h"
#include "planner.h"
#include "query.h"
#include "qworker.h"
#include "qwInt.h"
#include "qworker.h"
#include "tcommon.h"
#include "tmsg.h"
#include "tname.h"
...
...
@@ -12,7 +12,8 @@
int32_t
qwMallocFetchRsp
(
int8_t
rpcMalloc
,
int32_t
length
,
SRetrieveTableRsp
**
rsp
)
{
int32_t
msgSize
=
sizeof
(
SRetrieveTableRsp
)
+
length
;
SRetrieveTableRsp
*
pRsp
=
(
SRetrieveTableRsp
*
)(
rpcMalloc
?
rpcReallocCont
(
*
rsp
,
msgSize
)
:
taosMemoryRealloc
(
*
rsp
,
msgSize
));
SRetrieveTableRsp
*
pRsp
=
(
SRetrieveTableRsp
*
)(
rpcMalloc
?
rpcReallocCont
(
*
rsp
,
msgSize
)
:
taosMemoryRealloc
(
*
rsp
,
msgSize
));
if
(
NULL
==
pRsp
)
{
qError
(
"rpcMallocCont %d failed"
,
msgSize
);
QW_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
...
...
@@ -21,7 +22,7 @@ int32_t qwMallocFetchRsp(int8_t rpcMalloc, int32_t length, SRetrieveTableRsp **r
if
(
NULL
==
*
rsp
)
{
memset
(
pRsp
,
0
,
sizeof
(
SRetrieveTableRsp
));
}
*
rsp
=
pRsp
;
return
TSDB_CODE_SUCCESS
;
...
...
@@ -61,8 +62,8 @@ int32_t qwBuildAndSendErrorRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t c
}
int32_t
qwBuildAndSendQueryRsp
(
int32_t
rspType
,
SRpcHandleInfo
*
pConn
,
int32_t
code
,
SQWTaskCtx
*
ctx
)
{
STbVerInfo
*
tbInfo
=
ctx
?
&
ctx
->
tbInfo
:
NULL
;
int64_t
affectedRows
=
ctx
?
ctx
->
affectedRows
:
0
;
STbVerInfo
*
tbInfo
=
ctx
?
&
ctx
->
tbInfo
:
NULL
;
int64_t
affectedRows
=
ctx
?
ctx
->
affectedRows
:
0
;
SQueryTableRsp
*
pRsp
=
(
SQueryTableRsp
*
)
rpcMallocCont
(
sizeof
(
SQueryTableRsp
));
pRsp
->
code
=
htonl
(
code
);
pRsp
->
affectedRows
=
htobe64
(
affectedRows
);
...
...
@@ -85,12 +86,12 @@ int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t c
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwBuildAndSendExplainRsp
(
SRpcHandleInfo
*
pConn
,
SArray
*
pExecList
)
{
SExplainExecInfo
*
pInfo
=
taosArrayGet
(
pExecList
,
0
);
SExplainRsp
rsp
=
{.
numOfPlans
=
taosArrayGetSize
(
pExecList
),
.
subplanInfo
=
pInfo
};
int32_t
qwBuildAndSendExplainRsp
(
SRpcHandleInfo
*
pConn
,
SArray
*
pExecList
)
{
SExplainExecInfo
*
pInfo
=
taosArrayGet
(
pExecList
,
0
);
SExplainRsp
rsp
=
{.
numOfPlans
=
taosArrayGetSize
(
pExecList
),
.
subplanInfo
=
pInfo
};
int32_t
contLen
=
tSerializeSExplainRsp
(
NULL
,
0
,
&
rsp
);
void
*
pRsp
=
rpcMallocCont
(
contLen
);
void
*
pRsp
=
rpcMallocCont
(
contLen
);
tSerializeSExplainRsp
(
pRsp
,
contLen
,
&
rsp
);
SRpcMsg
rpcRsp
=
{
...
...
@@ -108,7 +109,7 @@ int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SArray* pExecList) {
int32_t
qwBuildAndSendHbRsp
(
SRpcHandleInfo
*
pConn
,
SSchedulerHbRsp
*
pStatus
,
int32_t
code
)
{
int32_t
contLen
=
tSerializeSSchedulerHbRsp
(
NULL
,
0
,
pStatus
);
void
*
pRsp
=
rpcMallocCont
(
contLen
);
void
*
pRsp
=
rpcMallocCont
(
contLen
);
tSerializeSSchedulerHbRsp
(
pRsp
,
contLen
,
pStatus
);
SRpcMsg
rpcRsp
=
{
...
...
@@ -124,7 +125,8 @@ int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *pStatus, int
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwBuildAndSendFetchRsp
(
int32_t
rspType
,
SRpcHandleInfo
*
pConn
,
SRetrieveTableRsp
*
pRsp
,
int32_t
dataLength
,
int32_t
code
)
{
int32_t
qwBuildAndSendFetchRsp
(
int32_t
rspType
,
SRpcHandleInfo
*
pConn
,
SRetrieveTableRsp
*
pRsp
,
int32_t
dataLength
,
int32_t
code
)
{
if
(
NULL
==
pRsp
)
{
pRsp
=
(
SRetrieveTableRsp
*
)
rpcMallocCont
(
sizeof
(
SRetrieveTableRsp
));
memset
(
pRsp
,
0
,
sizeof
(
SRetrieveTableRsp
));
...
...
@@ -209,7 +211,6 @@ int32_t qwBuildAndSendDropMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwBuildAndSendCQueryMsg
(
QW_FPARAMS_DEF
,
SRpcHandleInfo
*
pConn
)
{
SQueryContinueReq
*
req
=
(
SQueryContinueReq
*
)
rpcMallocCont
(
sizeof
(
SQueryContinueReq
));
if
(
NULL
==
req
)
{
...
...
@@ -309,7 +310,7 @@ int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) {
int32_t
code
=
0
;
SSubQueryMsg
*
msg
=
pMsg
->
pCont
;
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<=
sizeof
(
*
msg
))
{
QW_ELOG
(
"invalid query msg, msg:%p, msgLen:%d"
,
msg
,
pMsg
->
contLen
);
...
...
@@ -330,7 +331,8 @@ int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) {
int64_t
rId
=
msg
->
refId
;
int32_t
eId
=
msg
->
execId
;
SQWMsg
qwMsg
=
{.
msgType
=
pMsg
->
msgType
,
.
msg
=
msg
->
msg
+
msg
->
sqlLen
,
.
msgLen
=
msg
->
phyLen
,
.
connInfo
=
pMsg
->
info
};
SQWMsg
qwMsg
=
{
.
msgType
=
pMsg
->
msgType
,
.
msg
=
msg
->
msg
+
msg
->
sqlLen
,
.
msgLen
=
msg
->
phyLen
,
.
connInfo
=
pMsg
->
info
};
QW_SCH_TASK_DLOG
(
"prerocessQuery start, handle:%p"
,
pMsg
->
info
.
handle
);
QW_ERR_RET
(
qwPreprocessQuery
(
QW_FPARAMS
(),
&
qwMsg
));
...
...
@@ -345,7 +347,7 @@ int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) {
}
SSubQueryMsg
*
msg
=
pMsg
->
pCont
;
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
uint64_t
sId
=
msg
->
sId
;
uint64_t
qId
=
msg
->
queryId
;
...
...
@@ -367,7 +369,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
int32_t
code
=
0
;
SSubQueryMsg
*
msg
=
pMsg
->
pCont
;
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
qwUpdateTimeInQueue
(
mgmt
,
ts
,
QUERY_QUEUE
);
QW_STAT_INC
(
mgmt
->
stat
.
msgStat
.
queryProcessed
,
1
);
...
...
@@ -383,13 +385,18 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
int64_t
rId
=
msg
->
refId
;
int32_t
eId
=
msg
->
execId
;
SQWMsg
qwMsg
=
{.
node
=
node
,
.
msg
=
msg
->
msg
+
msg
->
sqlLen
,
.
msgLen
=
msg
->
phyLen
,
.
connInfo
=
pMsg
->
info
,
.
msgType
=
pMsg
->
msgType
};
SQWMsg
qwMsg
=
{.
node
=
node
,
.
msg
=
msg
->
msg
+
msg
->
sqlLen
,
.
msgLen
=
msg
->
phyLen
,
.
connInfo
=
pMsg
->
info
,
.
msgType
=
pMsg
->
msgType
};
qwMsg
.
msgInfo
.
explain
=
msg
->
explain
;
qwMsg
.
msgInfo
.
taskType
=
msg
->
taskType
;
qwMsg
.
msgInfo
.
needFetch
=
msg
->
needFetch
;
char
*
sql
=
strndup
(
msg
->
msg
,
msg
->
sqlLen
);
QW_SCH_TASK_DLOG
(
"processQuery start, node:%p, type:%s, handle:%p, SQL:%s"
,
node
,
TMSG_INFO
(
pMsg
->
msgType
),
pMsg
->
info
.
handle
,
sql
);
char
*
sql
=
strndup
(
msg
->
msg
,
msg
->
sqlLen
);
QW_SCH_TASK_DLOG
(
"processQuery start, node:%p, type:%s, handle:%p, SQL:%s"
,
node
,
TMSG_INFO
(
pMsg
->
msgType
),
pMsg
->
info
.
handle
,
sql
);
QW_ERR_JRET
(
qwProcessQuery
(
QW_FPARAMS
(),
&
qwMsg
,
sql
));
_return:
...
...
@@ -405,8 +412,8 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in
bool
queryDone
=
false
;
SQueryContinueReq
*
msg
=
(
SQueryContinueReq
*
)
pMsg
->
pCont
;
bool
needStop
=
false
;
SQWTaskCtx
*
handles
=
NULL
;
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
SQWTaskCtx
*
handles
=
NULL
;
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
qwUpdateTimeInQueue
(
mgmt
,
ts
,
QUERY_QUEUE
);
QW_STAT_INC
(
mgmt
->
stat
.
msgStat
.
cqueryProcessed
,
1
);
...
...
@@ -439,7 +446,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
}
SResFetchReq
*
msg
=
pMsg
->
pCont
;
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
qwUpdateTimeInQueue
(
mgmt
,
ts
,
FETCH_QUEUE
);
QW_STAT_INC
(
mgmt
->
stat
.
msgStat
.
fetchProcessed
,
1
);
...
...
@@ -472,7 +479,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
}
int32_t
qWorkerProcessRspMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
,
int64_t
ts
)
{
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
if
(
mgmt
)
{
qwUpdateTimeInQueue
(
mgmt
,
ts
,
FETCH_QUEUE
);
QW_STAT_INC
(
mgmt
->
stat
.
msgStat
.
rspProcessed
,
1
);
...
...
@@ -488,7 +495,7 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
int32_t
code
=
0
;
STaskCancelReq
*
msg
=
pMsg
->
pCont
;
...
...
@@ -531,7 +538,7 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int6
int32_t
code
=
0
;
STaskDropReq
*
msg
=
pMsg
->
pCont
;
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
qwUpdateTimeInQueue
(
mgmt
,
ts
,
FETCH_QUEUE
);
QW_STAT_INC
(
mgmt
->
stat
.
msgStat
.
dropProcessed
,
1
);
...
...
@@ -575,7 +582,7 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_
int32_t
code
=
0
;
SSchedulerHbReq
req
=
{
0
};
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
qwUpdateTimeInQueue
(
mgmt
,
ts
,
FETCH_QUEUE
);
QW_STAT_INC
(
mgmt
->
stat
.
msgStat
.
hbProcessed
,
1
);
...
...
@@ -606,20 +613,19 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_
return
TSDB_CODE_SUCCESS
;
}
int32_t
qWorkerProcessDeleteMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
,
SDeleteRes
*
pRes
)
{
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
)
{
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
int32_t
code
=
0
;
int32_t
code
=
0
;
SVDeleteReq
req
=
{
0
};
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
QW_STAT_INC
(
mgmt
->
stat
.
msgStat
.
deleteProcessed
,
1
);
tDeserializeSVDeleteReq
(
pMsg
->
pCont
,
pMsg
->
contLen
,
&
req
);
uint64_t
sId
=
req
.
sId
;
uint64_t
qId
=
req
.
queryId
;
uint64_t
tId
=
req
.
taskId
;
...
...
@@ -639,5 +645,3 @@ _return:
QW_RET
(
code
);
}
source/libs/qworker/src/qwUtil.c
浏览文件 @
8a033aca
...
...
@@ -295,7 +295,7 @@ void qwFreeTaskCtx(SQWTaskCtx *ctx) {
if
(
ctx
->
ctrlConnInfo
.
handle
)
{
tmsgReleaseHandle
(
&
ctx
->
ctrlConnInfo
,
TAOS_CONN_SERVER
);
}
ctx
->
ctrlConnInfo
.
handle
=
NULL
;
ctx
->
ctrlConnInfo
.
refId
=
-
1
;
...
...
@@ -454,21 +454,21 @@ void qwDestroySchStatus(SQWSchStatus *pStatus) { taosHashCleanup(pStatus->tasksH
void
qwDestroyImpl
(
void
*
pMgmt
)
{
SQWorker
*
mgmt
=
(
SQWorker
*
)
pMgmt
;
int8_t
nodeType
=
mgmt
->
nodeType
;
int32_t
nodeId
=
mgmt
->
nodeId
;
qDebug
(
"start to destroy qworker, type:%d, id:%d, handle:%p"
,
nodeType
,
nodeId
,
mgmt
);
int8_t
nodeType
=
mgmt
->
nodeType
;
int32_t
nodeId
=
mgmt
->
nodeId
;
qDebug
(
"start to destroy qworker, type:%d, id:%d, handle:%p"
,
nodeType
,
nodeId
,
mgmt
);
taosTmrStop
(
mgmt
->
hbTimer
);
mgmt
->
hbTimer
=
NULL
;
taosTmrCleanUp
(
mgmt
->
timer
);
uint64_t
qId
,
tId
;
int32_t
eId
;
void
*
pIter
=
taosHashIterate
(
mgmt
->
ctxHash
,
NULL
);
int32_t
eId
;
void
*
pIter
=
taosHashIterate
(
mgmt
->
ctxHash
,
NULL
);
while
(
pIter
)
{
SQWTaskCtx
*
ctx
=
(
SQWTaskCtx
*
)
pIter
;
void
*
key
=
taosHashGetKey
(
pIter
,
NULL
);
void
*
key
=
taosHashGetKey
(
pIter
,
NULL
);
QW_GET_QTID
(
key
,
qId
,
tId
,
eId
);
qwFreeTaskCtx
(
ctx
);
...
...
@@ -486,14 +486,14 @@ void qwDestroyImpl(void *pMgmt) {
taosHashCleanup
(
mgmt
->
schHash
);
*
mgmt
->
destroyed
=
1
;
taosMemoryFree
(
mgmt
);
atomic_sub_fetch_32
(
&
gQwMgmt
.
qwNum
,
1
);
qwCloseRef
();
qDebug
(
"qworker destroyed, type:%d, id:%d, handle:%p"
,
nodeType
,
nodeId
,
mgmt
);
qDebug
(
"qworker destroyed, type:%d, id:%d, handle:%p"
,
nodeType
,
nodeId
,
mgmt
);
}
int32_t
qwOpenRef
(
void
)
{
...
...
@@ -550,11 +550,10 @@ int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type) {
return
-
1
;
}
void
qwClearExpiredSch
(
SQWorker
*
mgmt
,
SArray
*
pExpiredSch
)
{
void
qwClearExpiredSch
(
SQWorker
*
mgmt
,
SArray
*
pExpiredSch
)
{
int32_t
num
=
taosArrayGetSize
(
pExpiredSch
);
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
uint64_t
*
sId
=
taosArrayGet
(
pExpiredSch
,
i
);
uint64_t
*
sId
=
taosArrayGet
(
pExpiredSch
,
i
);
SQWSchStatus
*
pSch
=
NULL
;
if
(
qwAcquireScheduler
(
mgmt
,
*
sId
,
QW_WRITE
,
&
pSch
))
{
continue
;
...
...
@@ -569,5 +568,3 @@ void qwClearExpiredSch(SQWorker *mgmt, SArray* pExpiredSch) {
qwReleaseScheduler
(
QW_WRITE
,
mgmt
);
}
}
source/libs/qworker/src/qworker.c
浏览文件 @
8a033aca
...
...
@@ -7,9 +7,9 @@
#include "qwInt.h"
#include "qwMsg.h"
#include "tcommon.h"
#include "tdatablock.h"
#include "tmsg.h"
#include "tname.h"
#include "tdatablock.h"
SQWorkerMgmt
gQwMgmt
=
{
.
lock
=
0
,
...
...
@@ -17,8 +17,8 @@ SQWorkerMgmt gQwMgmt = {
.
qwNum
=
0
,
};
static
void
freeBlock
(
void
*
param
)
{
SSDataBlock
*
pBlock
=
*
(
SSDataBlock
**
)
param
;
static
void
freeBlock
(
void
*
param
)
{
SSDataBlock
*
pBlock
=
*
(
SSDataBlock
**
)
param
;
blockDataDestroy
(
pBlock
);
}
...
...
@@ -100,7 +100,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
int32_t
execNum
=
0
;
qTaskInfo_t
taskHandle
=
ctx
->
taskHandle
;
DataSinkHandle
sinkHandle
=
ctx
->
sinkHandle
;
SLocalFetch
localFetch
=
{(
void
*
)
mgmt
,
ctx
->
localExec
,
qWorkerProcessLocalFetch
,
ctx
->
explainRes
};
SLocalFetch
localFetch
=
{(
void
*
)
mgmt
,
ctx
->
localExec
,
qWorkerProcessLocalFetch
,
ctx
->
explainRes
};
SArray
*
pResList
=
taosArrayInit
(
4
,
POINTER_BYTES
);
while
(
true
)
{
...
...
@@ -512,7 +512,7 @@ _return:
QW_TASK_DLOG
(
"query msg rsped, handle:%p, code:%x - %s"
,
ctx
->
ctrlConnInfo
.
handle
,
code
,
tstrerror
(
code
));
}
}
ctx
->
queryRsped
=
true
;
}
...
...
@@ -541,8 +541,8 @@ int32_t qwAbortPrerocessQuery(QW_FPARAMS_DEF) {
}
int32_t
qwPreprocessQuery
(
QW_FPARAMS_DEF
,
SQWMsg
*
qwMsg
)
{
int32_t
code
=
0
;
SQWTaskCtx
*
ctx
=
NULL
;
int32_t
code
=
0
;
SQWTaskCtx
*
ctx
=
NULL
;
QW_ERR_JRET
(
qwRegisterQueryBrokenLinkArg
(
QW_FPARAMS
(),
&
qwMsg
->
connInfo
));
...
...
@@ -1113,10 +1113,10 @@ void qWorkerDestroy(void **qWorkerMgmt) {
return
;
}
int32_t
destroyed
=
0
;
int32_t
destroyed
=
0
;
SQWorker
*
mgmt
=
*
qWorkerMgmt
;
mgmt
->
destroyed
=
&
destroyed
;
if
(
taosRemoveRef
(
gQwMgmt
.
qwRef
,
mgmt
->
refId
))
{
qError
(
"remove qw from ref list failed, refId:%"
PRIx64
,
mgmt
->
refId
);
return
;
...
...
@@ -1153,15 +1153,16 @@ int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pSt
return
TSDB_CODE_SUCCESS
;
}
int32_t
qWorkerProcessLocalQuery
(
void
*
pMgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
,
int64_t
rId
,
int32_t
eId
,
SQWMsg
*
qwMsg
,
SArray
*
explainRes
)
{
SQWorker
*
mgmt
=
(
SQWorker
*
)
pMgmt
;
int32_t
qWorkerProcessLocalQuery
(
void
*
pMgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
,
int64_t
rId
,
int32_t
eId
,
SQWMsg
*
qwMsg
,
SArray
*
explainRes
)
{
SQWorker
*
mgmt
=
(
SQWorker
*
)
pMgmt
;
int32_t
code
=
0
;
SQWTaskCtx
*
ctx
=
NULL
;
SSubplan
*
plan
=
(
SSubplan
*
)
qwMsg
->
msg
;
SQWPhaseInput
input
=
{
0
};
qTaskInfo_t
pTaskInfo
=
NULL
;
DataSinkHandle
sinkHandle
=
NULL
;
SReadHandle
rHandle
=
{
0
};
SReadHandle
rHandle
=
{
0
};
QW_ERR_JRET
(
qwAddTaskCtx
(
QW_FPARAMS
()));
QW_ERR_JRET
(
qwAddTaskStatus
(
QW_FPARAMS
(),
JOB_TASK_STATUS_INIT
));
...
...
@@ -1178,7 +1179,7 @@ int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64
rHandle
.
pMsgCb
=
taosMemoryCalloc
(
1
,
sizeof
(
SMsgCb
));
rHandle
.
pMsgCb
->
clientRpc
=
qwMsg
->
connInfo
.
handle
;
code
=
qCreateExecTask
(
&
rHandle
,
mgmt
->
nodeId
,
tId
,
plan
,
&
pTaskInfo
,
&
sinkHandle
,
NULL
,
OPTR_EXEC_MODEL_BATCH
);
if
(
code
)
{
QW_TASK_ELOG
(
"qCreateExecTask failed, code:%x - %s"
,
code
,
tstrerror
(
code
));
...
...
@@ -1199,7 +1200,7 @@ int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64
_return:
taosMemoryFree
(
rHandle
.
pMsgCb
);
input
.
code
=
code
;
input
.
msgType
=
qwMsg
->
msgType
;
code
=
qwHandlePostPhaseEvents
(
QW_FPARAMS
(),
QW_PHASE_POST_QUERY
,
&
input
,
NULL
);
...
...
@@ -1212,13 +1213,14 @@ _return:
QW_RET
(
code
);
}
int32_t
qWorkerProcessLocalFetch
(
void
*
pMgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
,
int64_t
rId
,
int32_t
eId
,
void
**
pRsp
,
SArray
*
explainRes
)
{
SQWorker
*
mgmt
=
(
SQWorker
*
)
pMgmt
;
int32_t
code
=
0
;
int32_t
dataLen
=
0
;
SQWTaskCtx
*
ctx
=
NULL
;
void
*
rsp
=
NULL
;
bool
queryStop
=
false
;
int32_t
qWorkerProcessLocalFetch
(
void
*
pMgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
,
int64_t
rId
,
int32_t
eId
,
void
**
pRsp
,
SArray
*
explainRes
)
{
SQWorker
*
mgmt
=
(
SQWorker
*
)
pMgmt
;
int32_t
code
=
0
;
int32_t
dataLen
=
0
;
SQWTaskCtx
*
ctx
=
NULL
;
void
*
rsp
=
NULL
;
bool
queryStop
=
false
;
SQWPhaseInput
input
=
{
0
};
...
...
@@ -1228,15 +1230,15 @@ int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64
ctx
->
msgType
=
TDMT_SCH_MERGE_FETCH
;
ctx
->
explainRes
=
explainRes
;
SOutputData
sOutput
=
{
0
};
while
(
true
)
{
QW_ERR_JRET
(
qwGetQueryResFromSink
(
QW_FPARAMS
(),
ctx
,
&
dataLen
,
&
rsp
,
&
sOutput
));
if
(
NULL
==
rsp
)
{
QW_ERR_JRET
(
qwExecTask
(
QW_FPARAMS
(),
ctx
,
&
queryStop
));
continue
;
}
else
{
bool
qComplete
=
(
DS_BUF_EMPTY
==
sOutput
.
bufStatus
&&
sOutput
.
queryEnd
);
...
...
@@ -1259,5 +1261,3 @@ _return:
QW_RET
(
code
);
}
source/libs/qworker/test/qworkerTests.cpp
浏览文件 @
8a033aca
此差异已折叠。
点击以展开。
tools/scripts/codeFormat.sh
浏览文件 @
8a033aca
...
...
@@ -22,8 +22,8 @@ FORMAT_DIR_LIST=(
"
${
PRJ_ROOT_DIR
}
/source/libs/nodes"
# "${PRJ_ROOT_DIR}/source/libs/parser"
"
${
PRJ_ROOT_DIR
}
/source/libs/planner"
#
"${PRJ_ROOT_DIR}/source/libs/qcom"
#
"${PRJ_ROOT_DIR}/source/libs/qworker"
"
${
PRJ_ROOT_DIR
}
/source/libs/qcom"
"
${
PRJ_ROOT_DIR
}
/source/libs/qworker"
"
${
PRJ_ROOT_DIR
}
/source/libs/scalar"
"
${
PRJ_ROOT_DIR
}
/source/libs/stream"
"
${
PRJ_ROOT_DIR
}
/source/libs/sync"
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录