Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
小楼昨夜-听风雨
TDengine
提交
8f65f438
T
TDengine
项目概览
小楼昨夜-听风雨
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
8f65f438
编写于
6月 01, 2020
作者:
陶建辉(Jeff)
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
rpc changes to make re-auth work
上级
403c58c2
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
38 addition
and
22 deletion
+38
-22
src/rpc/src/rpcMain.c
src/rpc/src/rpcMain.c
+38
-22
未找到文件。
src/rpc/src/rpcMain.c
浏览文件 @
8f65f438
...
...
@@ -124,7 +124,7 @@ typedef struct SRpcConn {
}
SRpcConn
;
int
tsRpcMaxUdpSize
=
15000
;
// bytes
int
tsProgressTimer
=
100
;
// not configurable
int
tsRpcMaxRetry
;
int
tsRpcHeadSize
;
...
...
@@ -203,7 +203,8 @@ static void rpcUnlockConn(SRpcConn *pConn);
void
*
rpcOpen
(
const
SRpcInit
*
pInit
)
{
SRpcInfo
*
pRpc
;
tsRpcMaxRetry
=
tsRpcMaxTime
*
1000
*
2
/
tsRpcTimer
;
tsProgressTimer
=
tsRpcTimer
/
2
;
tsRpcMaxRetry
=
tsRpcMaxTime
*
1000
/
tsProgressTimer
;
tsRpcHeadSize
=
RPC_MSG_OVERHEAD
;
tsRpcOverhead
=
sizeof
(
SRpcReqContext
);
...
...
@@ -419,8 +420,11 @@ void rpcSendResponse(const SRpcMsg *pRsp) {
pConn
->
rspMsgLen
=
msgLen
;
if
(
pMsg
->
code
==
TSDB_CODE_ACTION_IN_PROGRESS
)
pConn
->
inTranId
--
;
SRpcInfo
*
pRpc
=
pConn
->
pRpc
;
taosTmrStopA
(
&
pConn
->
pTimer
);
// taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer);
// set the idle timer to monitor the activity
taosTmrReset
(
rpcProcessIdleTimer
,
pRpc
->
idleTime
,
pConn
,
pRpc
->
tmrCtrl
,
&
pConn
->
pIdleTimer
);
rpcSendMsgToPeer
(
pConn
,
msg
,
msgLen
);
pConn
->
secured
=
1
;
// connection shall be secured
...
...
@@ -682,6 +686,7 @@ static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) {
tError
(
"%s %p, failed to set up connection(%s)"
,
pRpc
->
label
,
pContext
->
ahandle
,
tstrerror
(
terrno
));
}
pConn
->
tretry
=
0
;
return
pConn
;
}
...
...
@@ -747,20 +752,28 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) {
taosTmrStopA
(
&
pConn
->
pTimer
);
pConn
->
retry
=
0
;
if
(
pHead
->
code
==
TSDB_CODE_AUTH_REQUIRED
&&
pRpc
->
spi
)
{
tTrace
(
"%s, authentication shall be restarted"
,
pConn
->
info
);
pConn
->
secured
=
0
;
rpcSendMsgToPeer
(
pConn
,
pConn
->
pReqMsg
,
pConn
->
reqMsgLen
);
pConn
->
pTimer
=
taosTmrStart
(
rpcProcessRetryTimer
,
tsRpcTimer
,
pConn
,
pRpc
->
tmrCtrl
);
return
TSDB_CODE_ALREADY_PROCESSED
;
}
if
(
pHead
->
code
==
TSDB_CODE_ACTION_IN_PROGRESS
)
{
if
(
pConn
->
tretry
<=
tsRpcMaxRetry
)
{
tTrace
(
"%s, peer is still processing the transaction
"
,
pConn
->
info
);
tTrace
(
"%s, peer is still processing the transaction
, retry:%d"
,
pConn
->
info
,
pConn
->
tretry
);
pConn
->
tretry
++
;
rpcSendReqHead
(
pConn
);
taosTmrReset
(
rpcProcessRetryTimer
,
tsRpcTimer
,
pConn
,
pRpc
->
tmrCtrl
,
&
pConn
->
pTimer
);
pConn
->
pTimer
=
taosTmrStart
(
rpcProcessRetryTimer
,
tsRpcTimer
,
pConn
,
pRpc
->
tmrCtrl
);
return
TSDB_CODE_ALREADY_PROCESSED
;
}
else
{
// peer still in processing, give up
return
TSDB_CODE_TOO_SLOW
;
tTrace
(
"%s, server processing takes too long time, give up"
,
pConn
->
info
);
pHead
->
code
=
TSDB_CODE_TOO_SLOW
;
}
}
pConn
->
tretry
=
0
;
pConn
->
outType
=
0
;
pConn
->
pReqMsg
=
NULL
;
pConn
->
reqMsgLen
=
0
;
...
...
@@ -819,7 +832,9 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
if
(
rpcIsReq
(
pHead
->
msgType
)
)
{
terrno
=
rpcProcessReqHead
(
pConn
,
pHead
);
pConn
->
connType
=
pRecv
->
connType
;
taosTmrReset
(
rpcProcessIdleTimer
,
pRpc
->
idleTime
,
pConn
,
pRpc
->
tmrCtrl
,
&
pConn
->
pIdleTimer
);
// client shall send the request within tsRpcTime again, put 20 mseconds tolerance
taosTmrReset
(
rpcProcessIdleTimer
,
tsRpcTimer
+
20
,
pConn
,
pRpc
->
tmrCtrl
,
&
pConn
->
pIdleTimer
);
}
else
{
terrno
=
rpcProcessRspHead
(
pConn
,
pHead
);
}
...
...
@@ -934,7 +949,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
if
(
rpcIsReq
(
pHead
->
msgType
)
)
{
rpcMsg
.
handle
=
pConn
;
taosTmrReset
(
rpcProcessProgressTimer
,
tsRpcTimer
/
2
,
pConn
,
pRpc
->
tmrCtrl
,
&
pConn
->
pTimer
);
pConn
->
pTimer
=
taosTmrStart
(
rpcProcessProgressTimer
,
tsProgressTimer
,
pConn
,
pRpc
->
tmrCtrl
);
(
*
(
pRpc
->
cfp
))(
&
rpcMsg
,
NULL
);
}
else
{
// it's a response
...
...
@@ -942,14 +957,12 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
rpcMsg
.
handle
=
pContext
->
ahandle
;
pConn
->
pContext
=
NULL
;
if
(
pHead
->
code
==
TSDB_CODE_AUTH_REQUIRED
)
{
pConn
->
secured
=
0
;
rpcSendReqToServer
(
pRpc
,
pContext
);
return
;
}
// for UDP, port may be changed by server, the port in ipSet shall be used for cache
rpcAddConnIntoCache
(
pRpc
->
pCache
,
pConn
,
pConn
->
peerFqdn
,
pContext
->
ipSet
.
port
[
pContext
->
ipSet
.
inUse
],
pConn
->
connType
);
if
(
pHead
->
code
!=
TSDB_CODE_TOO_SLOW
)
{
rpcAddConnIntoCache
(
pRpc
->
pCache
,
pConn
,
pConn
->
peerFqdn
,
pContext
->
ipSet
.
port
[
pContext
->
ipSet
.
inUse
],
pConn
->
connType
);
}
else
{
rpcCloseConn
(
pConn
);
}
if
(
pHead
->
code
==
TSDB_CODE_REDIRECT
)
{
pContext
->
redirect
++
;
...
...
@@ -1038,6 +1051,7 @@ static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) {
pReplyHead
->
sourceId
=
pRecvHead
->
destId
;
pReplyHead
->
destId
=
pRecvHead
->
sourceId
;
pReplyHead
->
linkUid
=
pRecvHead
->
linkUid
;
pReplyHead
->
ahandle
=
pRecvHead
->
ahandle
;
pReplyHead
->
code
=
htonl
(
code
);
msgLen
=
sizeof
(
SRpcHead
);
...
...
@@ -1094,8 +1108,8 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
pConn
->
reqMsgLen
=
msgLen
;
pConn
->
pContext
=
pContext
;
taosTmrReset
(
rpcProcessRetryTimer
,
tsRpcTimer
,
pConn
,
pRpc
->
tmrCtrl
,
&
pConn
->
pTimer
);
rpcSendMsgToPeer
(
pConn
,
msg
,
msgLen
);
taosTmrReset
(
rpcProcessRetryTimer
,
tsRpcTimer
,
pConn
,
pRpc
->
tmrCtrl
,
&
pConn
->
pTimer
);
rpcUnlockConn
(
pConn
);
}
...
...
@@ -1171,7 +1185,7 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) {
if
(
pConn
->
retry
<
4
)
{
tTrace
(
"%s, re-send msg:%s to %s:%hu"
,
pConn
->
info
,
taosMsg
[
pConn
->
outType
],
pConn
->
peerFqdn
,
pConn
->
peerPort
);
rpcSendMsgToPeer
(
pConn
,
pConn
->
pReqMsg
,
pConn
->
reqMsgLen
);
taosTmrReset
(
rpcProcessRetryTimer
,
tsRpcTimer
,
pConn
,
pRpc
->
tmrCtrl
,
&
pConn
->
pTimer
);
pConn
->
pTimer
=
taosTmrStart
(
rpcProcessRetryTimer
,
tsRpcTimer
,
pConn
,
pRpc
->
tmrCtrl
);
}
else
{
// close the connection
tTrace
(
"%s, failed to send msg:%s to %s:%hu"
,
pConn
->
info
,
taosMsg
[
pConn
->
outType
],
pConn
->
peerFqdn
,
pConn
->
peerPort
);
...
...
@@ -1224,7 +1238,7 @@ static void rpcProcessProgressTimer(void *param, void *tmrId) {
if
(
pConn
->
inType
&&
pConn
->
user
[
0
])
{
tTrace
(
"%s, progress timer expired, send progress"
,
pConn
->
info
);
rpcSendQuickRsp
(
pConn
,
TSDB_CODE_ACTION_IN_PROGRESS
);
taosTmrReset
(
rpcProcessProgressTimer
,
tsRpcTimer
/
2
,
pConn
,
pRpc
->
tmrCtrl
,
&
pConn
->
pTimer
);
pConn
->
pTimer
=
taosTmrStart
(
rpcProcessProgressTimer
,
tsProgressTimer
,
pConn
,
pRpc
->
tmrCtrl
);
}
else
{
tTrace
(
"%s, progress timer:%p not processed"
,
pConn
->
info
,
tmrId
);
}
...
...
@@ -1356,15 +1370,17 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
if
((
pConn
->
secured
&&
pHead
->
spi
==
0
)
||
(
pHead
->
spi
==
0
&&
pConn
->
spi
==
0
)){
// secured link, or no authentication
pHead
->
msgLen
=
(
int32_t
)
htonl
((
uint32_t
)
pHead
->
msgLen
);
// tTrace("%s, secured link, no auth is required", pConn->info);
return
0
;
}
if
(
!
rpcIsReq
(
pHead
->
msgType
)
)
{
// for response, if code is auth failure, it shall bypass the auth process
code
=
htonl
(
pHead
->
code
);
if
(
code
==
TSDB_CODE_INVALID_TIME_STAMP
||
code
==
TSDB_CODE_AUTH_FAILURE
||
if
(
code
==
TSDB_CODE_INVALID_TIME_STAMP
||
code
==
TSDB_CODE_AUTH_FAILURE
||
code
==
TSDB_CODE_AUTH_REQUIRED
||
code
==
TSDB_CODE_INVALID_USER
||
code
==
TSDB_CODE_NOT_READY
)
{
pHead
->
msgLen
=
(
int32_t
)
htonl
((
uint32_t
)
pHead
->
msgLen
);
// tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code);
return
0
;
}
}
...
...
@@ -1387,12 +1403,12 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
}
else
{
pHead
->
msgLen
=
(
int32_t
)
htonl
((
uint32_t
)
pHead
->
msgLen
)
-
sizeof
(
SRpcDigest
);
if
(
!
rpcIsReq
(
pHead
->
msgType
)
)
pConn
->
secured
=
1
;
// link is secured for client
//tTrace("%s, message is authenticated", pConn->info);
//
tTrace("%s, message is authenticated", pConn->info);
}
}
}
else
{
tError
(
"%s, auth spi:%d not matched with received:%d"
,
pConn
->
info
,
pConn
->
spi
,
pHead
->
spi
);
code
=
TSDB_CODE_AUTH_FAILURE
;
code
=
pHead
->
spi
?
TSDB_CODE_AUTH_FAILURE
:
TSDB_CODE_AUTH_REQUIRED
;
}
return
code
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录