Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
0afdb145
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
0afdb145
编写于
4月 22, 2022
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feature/qnode
上级
162b3087
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
84 addition
and
10 deletion
+84
-10
include/libs/catalog/catalog.h
include/libs/catalog/catalog.h
+0
-1
source/libs/parser/src/parInsert.c
source/libs/parser/src/parInsert.c
+0
-1
source/libs/qworker/inc/qworkerMsg.h
source/libs/qworker/inc/qworkerMsg.h
+2
-1
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+40
-5
source/libs/qworker/src/qworkerMsg.c
source/libs/qworker/src/qworkerMsg.c
+42
-2
未找到文件。
include/libs/catalog/catalog.h
浏览文件 @
0afdb145
...
...
@@ -213,7 +213,6 @@ int32_t catalogGetTableHashVgroup(SCatalog* pCatalog, void * pTransporter, const
*/
int32_t
catalogGetAllMeta
(
SCatalog
*
pCatalog
,
void
*
pTransporter
,
const
SEpSet
*
pMgmtEps
,
const
SCatalogReq
*
pReq
,
SMetaData
*
pRsp
);
int32_t
catalogGetQnodeList
(
SCatalog
*
pCatalog
,
void
*
pTransporter
,
const
SEpSet
*
pMgmtEps
,
SArray
*
pQnodeList
);
int32_t
catalogGetExpiredSTables
(
SCatalog
*
pCatalog
,
SSTableMetaVersion
**
stables
,
uint32_t
*
num
);
...
...
source/libs/parser/src/parInsert.c
浏览文件 @
0afdb145
...
...
@@ -1325,7 +1325,6 @@ int32_t qBindStmtColsValue(void *pBlock, TAOS_BIND_v2 *bind, char *msgBuf, int32
STSRow
*
row
=
(
STSRow
*
)(
pDataBlock
->
pData
+
pDataBlock
->
size
);
// skip the SSubmitBlk header
tdSRowResetBuf
(
pBuilder
,
row
);
// 1. set the parsed value from sql string
for
(
int
c
=
0
;
c
<
spd
->
numOfBound
;
++
c
)
{
SSchema
*
pColSchema
=
&
pSchema
[
spd
->
boundColumns
[
c
]
-
1
];
...
...
source/libs/qworker/inc/qworkerMsg.h
浏览文件 @
0afdb145
...
...
@@ -43,7 +43,8 @@ void qwFreeFetchRsp(void *msg);
int32_t
qwMallocFetchRsp
(
int32_t
length
,
SRetrieveTableRsp
**
rsp
);
int32_t
qwGetSchTasksStatus
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
SSchedulerStatusRsp
**
rsp
);
int32_t
qwBuildAndSendHbRsp
(
SQWConnInfo
*
pConn
,
SSchedulerHbRsp
*
rsp
,
int32_t
code
);
int32_t
qwRegisterBrokenLinkArg
(
QW_FPARAMS_DEF
,
SQWConnInfo
*
pConn
);
int32_t
qwRegisterQueryBrokenLinkArg
(
QW_FPARAMS_DEF
,
SQWConnInfo
*
pConn
);
int32_t
qwRegisterHbBrokenLinkArg
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
SQWConnInfo
*
pConn
);
#ifdef __cplusplus
}
...
...
source/libs/qworker/src/qworker.c
浏览文件 @
0afdb145
...
...
@@ -948,7 +948,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex
DataSinkHandle
sinkHandle
=
NULL
;
SQWTaskCtx
*
ctx
=
NULL
;
QW_ERR_JRET
(
qwRegisterBrokenLinkArg
(
QW_FPARAMS
(),
&
qwMsg
->
connInfo
));
QW_ERR_JRET
(
qwRegister
Query
BrokenLinkArg
(
QW_FPARAMS
(),
&
qwMsg
->
connInfo
));
QW_ERR_JRET
(
qwHandlePrePhaseEvents
(
QW_FPARAMS
(),
QW_PHASE_PRE_QUERY
,
&
input
,
NULL
));
...
...
@@ -1285,23 +1285,51 @@ _return:
QW_RET
(
TSDB_CODE_SUCCESS
);
}
int32_t
qwProcessHbLinkBroken
(
SQWorkerMgmt
*
mgmt
,
SQWMsg
*
qwMsg
,
SSchedulerHbReq
*
req
)
{
int32_t
code
=
0
;
SSchedulerHbRsp
rsp
=
{
0
};
SQWSchStatus
*
sch
=
NULL
;
QW_ERR_RET
(
qwAcquireAddScheduler
(
mgmt
,
req
->
sId
,
QW_READ
,
&
sch
));
QW_LOCK
(
QW_WRITE
,
&
sch
->
hbConnLock
);
if
(
qwMsg
->
connInfo
.
handle
==
sch
->
hbConnInfo
.
handle
)
{
tmsgReleaseHandle
(
sch
->
hbConnInfo
.
handle
,
TAOS_CONN_SERVER
);
sch
->
hbConnInfo
.
handle
=
NULL
;
sch
->
hbConnInfo
.
ahandle
=
NULL
;
QW_DLOG
(
"release hb handle due to connection broken, handle:%p"
,
qwMsg
->
connInfo
.
handle
);
}
else
{
QW_DLOG
(
"ignore hb connection broken, handle:%p, currentHandle:%p"
,
qwMsg
->
connInfo
.
handle
,
sch
->
hbConnInfo
.
handle
);
}
QW_UNLOCK
(
QW_WRITE
,
&
sch
->
hbConnLock
);
qwReleaseScheduler
(
QW_READ
,
mgmt
);
QW_RET
(
TSDB_CODE_SUCCESS
);
}
int32_t
qwProcessHb
(
SQWorkerMgmt
*
mgmt
,
SQWMsg
*
qwMsg
,
SSchedulerHbReq
*
req
)
{
int32_t
code
=
0
;
SSchedulerHbRsp
rsp
=
{
0
};
SQWSchStatus
*
sch
=
NULL
;
uint64_t
seqId
=
0
;
void
*
origHandle
=
NULL
;
memcpy
(
&
rsp
.
epId
,
&
req
->
epId
,
sizeof
(
req
->
epId
));
if
(
qwMsg
->
code
)
{
QW_RET
(
qwProcessHbLinkBroken
(
mgmt
,
qwMsg
,
req
));
}
QW_ERR_JRET
(
qwAcquireAddScheduler
(
mgmt
,
req
->
sId
,
QW_READ
,
&
sch
));
QW_ERR_JRET
(
qwRegisterHbBrokenLinkArg
(
mgmt
,
req
->
sId
,
&
qwMsg
->
connInfo
));
QW_LOCK
(
QW_WRITE
,
&
sch
->
hbConnLock
);
if
(
sch
->
hbConnInfo
.
handle
)
{
tmsgReleaseHandle
(
sch
->
hbConnInfo
.
handle
,
TAOS_CONN_SERVER
);
}
memcpy
(
&
sch
->
hbConnInfo
,
&
qwMsg
->
connInfo
,
sizeof
(
qwMsg
->
connInfo
));
memcpy
(
&
sch
->
hbEpId
,
&
req
->
epId
,
sizeof
(
req
->
epId
));
...
...
@@ -1314,7 +1342,14 @@ int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
_return:
memcpy
(
&
rsp
.
epId
,
&
req
->
epId
,
sizeof
(
req
->
epId
));
qwBuildAndSendHbRsp
(
&
qwMsg
->
connInfo
,
&
rsp
,
code
);
if
(
code
)
{
tmsgReleaseHandle
(
qwMsg
->
connInfo
.
handle
,
TAOS_CONN_SERVER
);
}
QW_DLOG
(
"hb rsp send, handle:%p, code:%x - %s"
,
qwMsg
->
connInfo
.
handle
,
code
,
tstrerror
(
code
));
QW_RET
(
TSDB_CODE_SUCCESS
);
...
...
source/libs/qworker/src/qworkerMsg.c
浏览文件 @
0afdb145
...
...
@@ -286,7 +286,7 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn) {
}
int32_t
qwRegisterBrokenLinkArg
(
QW_FPARAMS_DEF
,
SQWConnInfo
*
pConn
)
{
int32_t
qwRegister
Query
BrokenLinkArg
(
QW_FPARAMS_DEF
,
SQWConnInfo
*
pConn
)
{
STaskDropReq
*
req
=
(
STaskDropReq
*
)
rpcMallocCont
(
sizeof
(
STaskDropReq
));
if
(
NULL
==
req
)
{
QW_SCH_TASK_ELOG
(
"rpcMallocCont %d failed"
,
(
int32_t
)
sizeof
(
STaskDropReq
));
...
...
@@ -313,6 +313,42 @@ int32_t qwRegisterBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwRegisterHbBrokenLinkArg
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
SQWConnInfo
*
pConn
)
{
SSchedulerHbReq
req
=
{
0
};
req
.
header
.
vgId
=
mgmt
->
nodeId
;
req
.
sId
=
sId
;
int32_t
msgSize
=
tSerializeSSchedulerHbReq
(
NULL
,
0
,
&
req
);
if
(
msgSize
<
0
)
{
QW_SCH_ELOG
(
"tSerializeSSchedulerHbReq hbReq failed, size:%d"
,
msgSize
);
QW_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
void
*
msg
=
rpcMallocCont
(
msgSize
);
if
(
NULL
==
msg
)
{
QW_SCH_ELOG
(
"calloc %d failed"
,
msgSize
);
QW_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
if
(
tSerializeSSchedulerHbReq
(
msg
,
msgSize
,
&
req
)
<
0
)
{
QW_SCH_ELOG
(
"tSerializeSSchedulerHbReq hbReq failed, size:%d"
,
msgSize
);
taosMemoryFree
(
msg
);
QW_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
SRpcMsg
pMsg
=
{
.
handle
=
pConn
->
handle
,
.
ahandle
=
pConn
->
ahandle
,
.
msgType
=
TDMT_VND_QUERY_HEARTBEAT
,
.
pCont
=
msg
,
.
contLen
=
sizeof
(
SSchedulerHbReq
),
.
code
=
TSDB_CODE_RPC_NETWORK_UNAVAIL
,
};
tmsgRegisterBrokenLinkArg
(
&
mgmt
->
msgCb
,
&
pMsg
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qWorkerProcessQueryMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
)
{
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
)
{
...
...
@@ -587,10 +623,14 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
}
uint64_t
sId
=
req
.
sId
;
SQWMsg
qwMsg
=
{.
node
=
node
,
.
msg
=
NULL
,
.
msgLen
=
0
};
SQWMsg
qwMsg
=
{.
node
=
node
,
.
msg
=
NULL
,
.
msgLen
=
0
,
.
code
=
pMsg
->
code
};
qwMsg
.
connInfo
.
handle
=
pMsg
->
handle
;
qwMsg
.
connInfo
.
ahandle
=
pMsg
->
ahandle
;
if
(
TSDB_CODE_RPC_NETWORK_UNAVAIL
==
pMsg
->
code
)
{
QW_SCH_DLOG
(
"receive Hb msg due to network broken, error:%s"
,
tstrerror
(
pMsg
->
code
));
}
QW_SCH_DLOG
(
"processHb start, node:%p, handle:%p"
,
node
,
pMsg
->
handle
);
QW_ERR_RET
(
qwProcessHb
(
mgmt
,
&
qwMsg
,
&
req
));
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录