Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
ac893cff
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
ac893cff
编写于
3月 01, 2020
作者:
陶建辉(Jeff)
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix the bug for authentication
上级
ffdc93d1
变更
4
显示空白变更内容
内联
并排
Showing
4 changed file
with
78 addition
and
51 deletion
+78
-51
src/rpc/src/rpcMain.c
src/rpc/src/rpcMain.c
+44
-47
src/rpc/src/rpcUdp.c
src/rpc/src/rpcUdp.c
+2
-2
src/rpc/test/rclient.c
src/rpc/test/rclient.c
+11
-2
src/rpc/test/rserver.c
src/rpc/test/rserver.c
+21
-0
未找到文件。
src/rpc/src/rpcMain.c
浏览文件 @
ac893cff
...
...
@@ -394,8 +394,8 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) {
// set msg header
pHead
->
version
=
1
;
pHead
->
msgType
=
pConn
->
inType
+
1
;
pHead
->
spi
=
0
;
pHead
->
encrypt
=
0
;
pHead
->
spi
=
pConn
->
spi
;
pHead
->
encrypt
=
pConn
->
encrypt
;
pHead
->
tranId
=
pConn
->
inTranId
;
pHead
->
sourceId
=
pConn
->
ownId
;
pHead
->
destId
=
pConn
->
peerId
;
...
...
@@ -543,7 +543,7 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) {
static
SRpcConn
*
rpcAllocateServerConn
(
SRpcInfo
*
pRpc
,
SRecvInfo
*
pRecv
)
{
SRpcConn
*
pConn
=
NULL
;
char
hashstr
[
40
];
char
hashstr
[
40
]
=
{
0
}
;
SRpcHead
*
pHead
=
(
SRpcHead
*
)
pRecv
->
msg
;
sprintf
(
hashstr
,
"%x:%x:%x:%d"
,
pRecv
->
ip
,
pHead
->
uid
,
pHead
->
sourceId
,
pRecv
->
connType
);
...
...
@@ -565,7 +565,7 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) {
pConn
->
sid
=
sid
;
pConn
->
tranId
=
(
uint16_t
)(
rand
()
&
0xFFFF
);
pConn
->
ownId
=
htonl
(
pConn
->
sid
);
if
(
pRpc
->
afp
&&
(
*
pRpc
->
afp
)(
pConn
->
user
,
&
pConn
->
spi
,
&
pConn
->
encrypt
,
pConn
->
secret
,
pConn
->
ckey
))
{
if
(
pRpc
->
afp
&&
(
*
pRpc
->
afp
)(
pConn
->
user
,
&
pConn
->
spi
,
&
pConn
->
encrypt
,
pConn
->
secret
,
pConn
->
ckey
)
<
0
)
{
tWarn
(
"%s %p, user not there"
,
pRpc
->
label
,
pConn
);
taosFreeId
(
pRpc
->
idPool
,
sid
);
// sid shall be released
terrno
=
TSDB_CODE_INVALID_USER
;
...
...
@@ -718,21 +718,12 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
SRpcHead
*
pHead
=
(
SRpcHead
*
)
pRecv
->
msg
;
sid
=
htonl
(
pHead
->
destId
);
pHead
->
code
=
htonl
(
pHead
->
code
);
pHead
->
msgLen
=
(
int32_t
)
htonl
((
uint32_t
)
pHead
->
msgLen
);
pHead
->
port
=
htons
(
pHead
->
port
);
if
(
pHead
->
msgType
>=
TSDB_MSG_TYPE_MAX
||
pHead
->
msgType
<=
0
)
{
tTrace
(
"%s sid:%d, invalid message type:%d"
,
pRpc
->
label
,
sid
,
pHead
->
msgType
);
terrno
=
TSDB_CODE_INVALID_MSG_TYPE
;
return
NULL
;
}
if
(
pRecv
->
msgLen
!=
pHead
->
msgLen
)
{
tTrace
(
"%s sid:%d, %s has invalid length, dataLen:%d, msgLen:%d"
,
pRpc
->
label
,
sid
,
taosMsg
[
pHead
->
msgType
],
pRecv
->
msgLen
,
pHead
->
msgLen
);
terrno
=
TSDB_CODE_INVALID_MSG_LEN
;
return
NULL
;
}
if
(
sid
<
0
||
sid
>=
pRpc
->
sessions
)
{
tTrace
(
"%s sid:%d, sid is out of range, max sid:%d, %s discarded"
,
pRpc
->
label
,
sid
,
pRpc
->
sessions
,
taosMsg
[
pHead
->
msgType
]);
...
...
@@ -754,10 +745,14 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
}
if
(
pRecv
->
port
)
pConn
->
peerPort
=
pRecv
->
port
;
if
(
pHead
->
port
)
pConn
->
peerPort
=
pHead
->
port
;
if
(
pHead
->
port
)
pConn
->
peerPort
=
htons
(
pHead
->
port
)
;
if
(
pHead
->
uid
)
pConn
->
peerUid
=
pHead
->
uid
;
terrno
=
rpcCheckAuthentication
(
pConn
,
(
char
*
)
pHead
,
pRecv
->
msgLen
);
// code can be transformed only after authentication
pHead
->
code
=
htonl
(
pHead
->
code
);
if
(
terrno
==
0
)
{
if
(
pHead
->
msgType
!=
TSDB_MSG_TYPE_REG
&&
pHead
->
encrypt
)
{
// decrypt here
...
...
@@ -903,9 +898,9 @@ static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) {
pReplyHead
->
version
=
pRecvHead
->
version
;
pReplyHead
->
msgType
=
(
char
)(
pRecvHead
->
msgType
+
1
);
pReplyHead
->
spi
=
0
;
pReplyHead
->
encrypt
=
0
;
pReplyHead
->
encrypt
=
pRecvHead
->
encrypt
;
pReplyHead
->
tranId
=
pRecvHead
->
tranId
;
pReplyHead
->
sourceId
=
0
;
pReplyHead
->
sourceId
=
pRecvHead
->
destId
;
pReplyHead
->
destId
=
pRecvHead
->
sourceId
;
memcpy
(
pReplyHead
->
user
,
pRecvHead
->
user
,
tListLen
(
pReplyHead
->
user
));
...
...
@@ -915,7 +910,7 @@ static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) {
if
(
code
==
TSDB_CODE_INVALID_TIME_STAMP
)
{
// include a time stamp if client's time is not synchronized well
uint8_t
*
pContent
=
pReplyHead
->
content
;
timeStamp
=
taosGetTimestampSec
(
);
timeStamp
=
htonl
(
taosGetTimestampSec
()
);
memcpy
(
pContent
,
&
timeStamp
,
sizeof
(
timeStamp
));
msgLen
+=
sizeof
(
timeStamp
);
}
...
...
@@ -1028,10 +1023,10 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) {
pConn
->
retry
++
;
if
(
pConn
->
retry
<
4
)
{
tTrace
(
"%s %p, re-send msg:%s to %s:%hu"
,
pRpc
->
label
,
pConn
,
taosMsg
[
pConn
->
outType
],
pConn
->
peerIpstr
,
pConn
->
peerPort
);
tTrace
(
"%s %p, re-send msg:%s to %s:%hu
retry:%d
"
,
pRpc
->
label
,
pConn
,
taosMsg
[
pConn
->
outType
],
pConn
->
peerIpstr
,
pConn
->
peerPort
,
pConn
->
retry
);
rpcSendMsgToPeer
(
pConn
,
pConn
->
pReqMsg
,
pConn
->
reqMsgLen
);
taosTmrReset
(
rpcProcessRetryTimer
,
tsRpcTimer
<<
pConn
->
retry
,
pConn
,
pRpc
->
tmrCtrl
,
&
pConn
->
pTimer
);
taosTmrReset
(
rpcProcessRetryTimer
,
tsRpcTimer
,
pConn
,
pRpc
->
tmrCtrl
,
&
pConn
->
pTimer
);
}
else
{
// close the connection
tTrace
(
"%s %p, failed to send msg:%s to %s:%hu"
,
pRpc
->
label
,
pConn
,
...
...
@@ -1080,12 +1075,6 @@ static void rpcProcessProgressTimer(void *param, void *tmrId) {
rpcUnlockConn
(
pConn
);
}
static
void
rpcFreeOutMsg
(
void
*
msg
)
{
if
(
msg
==
NULL
)
return
;
char
*
req
=
((
char
*
)
msg
)
-
sizeof
(
SRpcReqContext
);
free
(
req
);
}
static
int32_t
rpcCompressRpcMsg
(
char
*
pCont
,
int32_t
contLen
)
{
SRpcHead
*
pHead
=
rpcHeadFromCont
(
pCont
);
int32_t
finalLen
=
0
;
...
...
@@ -1157,14 +1146,14 @@ static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) {
return
pHead
;
}
static
int
rpcAuthenticateMsg
(
uint8_t
*
pMsg
,
int
msgLen
,
uint8_t
*
pAuth
,
uint8_t
*
pKey
)
{
static
int
rpcAuthenticateMsg
(
void
*
pMsg
,
int
msgLen
,
void
*
pAuth
,
void
*
pKey
)
{
MD5_CTX
context
;
int
ret
=
-
1
;
MD5Init
(
&
context
);
MD5Update
(
&
context
,
pKey
,
TSDB_KEY_LEN
);
MD5Update
(
&
context
,
pMsg
,
msgLen
);
MD5Update
(
&
context
,
pKey
,
TSDB_KEY_LEN
);
MD5Update
(
&
context
,
(
uint8_t
*
)
pKey
,
TSDB_KEY_LEN
);
MD5Update
(
&
context
,
(
uint8_t
*
)
pMsg
,
msgLen
);
MD5Update
(
&
context
,
(
uint8_t
*
)
pKey
,
TSDB_KEY_LEN
);
MD5Final
(
&
context
);
if
(
memcmp
(
context
.
digest
,
pAuth
,
sizeof
(
context
.
digest
))
==
0
)
ret
=
0
;
...
...
@@ -1172,18 +1161,16 @@ static int rpcAuthenticateMsg(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t
return
ret
;
}
static
int
rpcBuildAuthHead
(
uint8_t
*
pMsg
,
int
msgLen
,
uint8_t
*
pAuth
,
uint8_t
*
pKey
)
{
static
void
rpcBuildAuthHead
(
void
*
pMsg
,
int
msgLen
,
void
*
pAuth
,
void
*
pKey
)
{
MD5_CTX
context
;
MD5Init
(
&
context
);
MD5Update
(
&
context
,
pKey
,
TSDB_KEY_LEN
);
MD5Update
(
&
context
,
(
uint8_t
*
)
pKey
,
TSDB_KEY_LEN
);
MD5Update
(
&
context
,
(
uint8_t
*
)
pMsg
,
msgLen
);
MD5Update
(
&
context
,
pKey
,
TSDB_KEY_LEN
);
MD5Update
(
&
context
,
(
uint8_t
*
)
pKey
,
TSDB_KEY_LEN
);
MD5Final
(
&
context
);
memcpy
(
pAuth
,
context
.
digest
,
sizeof
(
context
.
digest
));
return
0
;
}
static
int
rpcAddAuthPart
(
SRpcConn
*
pConn
,
char
*
msg
,
int
msgLen
)
{
...
...
@@ -1196,7 +1183,7 @@ static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen) {
pDigest
->
timeStamp
=
htonl
(
taosGetTimestampSec
());
msgLen
+=
sizeof
(
SRpcDigest
);
pHead
->
msgLen
=
(
int32_t
)
htonl
((
uint32_t
)
msgLen
);
rpcBuildAuthHead
(
(
uint8_t
*
)
pHead
,
msgLen
-
TSDB_AUTH_LEN
,
pDigest
->
auth
,
(
uint8_t
*
)
pConn
->
secret
);
rpcBuildAuthHead
(
pHead
,
msgLen
-
TSDB_AUTH_LEN
,
pDigest
->
auth
,
pConn
->
secret
);
}
else
{
pHead
->
msgLen
=
(
int32_t
)
htonl
((
uint32_t
)
msgLen
);
}
...
...
@@ -1209,7 +1196,21 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
SRpcInfo
*
pRpc
=
pConn
->
pRpc
;
int32_t
code
=
0
;
if
(
pConn
->
spi
==
0
)
return
0
;
if
(
pConn
->
spi
==
0
)
{
pHead
->
msgLen
=
(
int32_t
)
htonl
((
uint32_t
)
pHead
->
msgLen
);
return
0
;
}
if
(
!
rpcIsReq
(
pHead
->
msgType
)
)
{
// for response, if code is auth failure, it shall bypass the auth process
code
=
htonl
(
pHead
->
code
);
if
(
code
==
TSDB_CODE_INVALID_TIME_STAMP
||
code
==
TSDB_CODE_AUTH_FAILURE
||
code
==
TSDB_CODE_INVALID_USER
)
{
pHead
->
msgLen
=
(
int32_t
)
htonl
((
uint32_t
)
pHead
->
msgLen
);
return
0
;
}
}
code
=
0
;
if
(
pHead
->
spi
==
pConn
->
spi
)
{
// authentication
...
...
@@ -1219,24 +1220,20 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
delta
=
(
int32_t
)
htonl
(
pDigest
->
timeStamp
);
delta
-=
(
int32_t
)
taosGetTimestampSec
();
if
(
abs
(
delta
)
>
900
)
{
tWarn
(
"%s %p, time diff:%d is too big, msg discarded, timestamp:%d"
,
pRpc
->
label
,
pConn
,
delta
,
htonl
(
pDigest
->
timeStamp
));
tWarn
(
"%s %p, time diff:%d is too big, msg discarded"
,
pRpc
->
label
,
pConn
,
delta
);
code
=
TSDB_CODE_INVALID_TIME_STAMP
;
}
else
{
if
(
rpcAuthenticateMsg
(
(
uint8_t
*
)
pHead
,
msgLen
-
TSDB_AUTH_LEN
,
pDigest
->
auth
,
(
uint8_t
*
)
pConn
->
secret
)
<
0
)
{
if
(
rpcAuthenticateMsg
(
pHead
,
msgLen
-
TSDB_AUTH_LEN
,
pDigest
->
auth
,
pConn
->
secret
)
<
0
)
{
tError
(
"%s %p, authentication failed, msg discarded"
,
pRpc
->
label
,
pConn
);
code
=
TSDB_CODE_AUTH_FAILURE
;
}
else
{
pHead
->
msgLen
-=
sizeof
(
SRpcDigest
);
pHead
->
msgLen
=
(
int32_t
)
htonl
((
uint32_t
)
pHead
->
msgLen
)
-
sizeof
(
SRpcDigest
);
}
}
}
else
{
// if it is request or response with code 0, msg shall be discarded
if
(
rpcIsReq
(
pHead
->
msgType
)
||
(
pHead
->
content
[
0
]
==
0
))
{
tTrace
(
"%s %p, auth spi not matched, msg discarded"
,
pRpc
->
label
,
pConn
);
code
=
TSDB_CODE_AUTH_FAILURE
;
}
}
return
code
;
}
...
...
src/rpc/src/rpcUdp.c
浏览文件 @
ac893cff
...
...
@@ -285,9 +285,9 @@ int taosSendUdpData(uint32_t ip, uint16_t port, void *data, int dataLen, void *c
destAdd
.
sin_addr
.
s_addr
=
ip
;
destAdd
.
sin_port
=
htons
(
port
);
//tTrace("%s msg is sent to 0x%x:%hu len:%d ret:%d localPort:%hu chandle:0x%x", pConn->label, destAdd.sin_addr.s_addr,
// port, dataLen, ret, pConn->localPort, chandle);
int
ret
=
(
int
)
sendto
(
pConn
->
fd
,
data
,
(
size_t
)
dataLen
,
0
,
(
struct
sockaddr
*
)
&
destAdd
,
sizeof
(
destAdd
));
tTrace
(
"%s msg is sent to 0x%x:%hu len:%d ret:%d localPort:%hu chandle:0x%x"
,
pConn
->
label
,
destAdd
.
sin_addr
.
s_addr
,
port
,
dataLen
,
ret
,
pConn
->
localPort
,
chandle
);
return
ret
;
}
...
...
src/rpc/test/rclient.c
浏览文件 @
ac893cff
...
...
@@ -110,6 +110,7 @@ int main(int argc, char *argv[]) {
rpcInit
.
user
=
"michael"
;
rpcInit
.
secret
=
"mypassword"
;
rpcInit
.
ckey
=
"key"
;
rpcInit
.
spi
=
1
;
for
(
int
i
=
1
;
i
<
argc
;
++
i
)
{
if
(
strcmp
(
argv
[
i
],
"-p"
)
==
0
&&
i
<
argc
-
1
)
{
...
...
@@ -128,11 +129,16 @@ int main(int argc, char *argv[]) {
numOfReqs
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-a"
)
==
0
&&
i
<
argc
-
1
)
{
appThreads
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-d"
)
==
0
&&
i
<
argc
-
1
)
{
rpcDebugFlag
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-o"
)
==
0
&&
i
<
argc
-
1
)
{
tsCompressMsgSize
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-u"
)
==
0
&&
i
<
argc
-
1
)
{
rpcInit
.
user
=
argv
[
++
i
];
}
else
if
(
strcmp
(
argv
[
i
],
"-k"
)
==
0
&&
i
<
argc
-
1
)
{
rpcInit
.
secret
=
argv
[
++
i
];
}
else
if
(
strcmp
(
argv
[
i
],
"-spi"
)
==
0
&&
i
<
argc
-
1
)
{
rpcInit
.
spi
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-d"
)
==
0
&&
i
<
argc
-
1
)
{
rpcDebugFlag
=
atoi
(
argv
[
++
i
]);
}
else
{
printf
(
"
\n
usage: %s [options]
\n
"
,
argv
[
0
]);
printf
(
" [-i ip]: first server IP address, default is:%s
\n
"
,
serverIp
);
...
...
@@ -144,6 +150,9 @@ int main(int argc, char *argv[]) {
printf
(
" [-a threads]: number of app threads, default is:%d
\n
"
,
appThreads
);
printf
(
" [-n requests]: number of requests per thread, default is:%d
\n
"
,
numOfReqs
);
printf
(
" [-o compSize]: compression message size, default is:%d
\n
"
,
tsCompressMsgSize
);
printf
(
" [-u user]: user name for the connection, default is:%s
\n
"
,
rpcInit
.
user
);
printf
(
" [-k secret]: password for the connection, default is:%s
\n
"
,
rpcInit
.
secret
);
printf
(
" [-spi SPI]: security parameter index, default is:%d
\n
"
,
rpcInit
.
spi
);
printf
(
" [-d debugFlag]: debug flag, default:%d
\n
"
,
rpcDebugFlag
);
printf
(
" [-h help]: print out this help
\n\n
"
);
exit
(
0
);
...
...
src/rpc/test/rserver.c
浏览文件 @
ac893cff
...
...
@@ -68,6 +68,26 @@ void processShellMsg(int numOfMsgs, SRpcMsg *pMsg) {
}
int
retrieveAuthInfo
(
char
*
meterId
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
)
{
// app shall retrieve the auth info based on meterID from DB or a data file
// demo code here only for simple demo
int
ret
=
0
;
if
(
strcmp
(
meterId
,
"michael"
)
==
0
)
{
*
spi
=
1
;
*
encrypt
=
0
;
strcpy
(
secret
,
"mypassword"
);
strcpy
(
ckey
,
"key"
);
}
else
if
(
strcmp
(
meterId
,
"jeff"
)
==
0
)
{
*
spi
=
0
;
*
encrypt
=
0
;
}
else
{
ret
=
-
1
;
// user not there
}
return
ret
;
}
void
processRequestMsg
(
char
type
,
void
*
pCont
,
int
contLen
,
void
*
thandle
,
int32_t
code
)
{
tTrace
(
"request is received, type:%d, contLen:%d"
,
type
,
contLen
);
SRpcMsg
rpcMsg
;
...
...
@@ -91,6 +111,7 @@ int main(int argc, char *argv[]) {
rpcInit
.
cfp
=
processRequestMsg
;
rpcInit
.
sessions
=
1000
;
rpcInit
.
idleTime
=
2000
;
rpcInit
.
afp
=
retrieveAuthInfo
;
for
(
int
i
=
1
;
i
<
argc
;
++
i
)
{
if
(
strcmp
(
argv
[
i
],
"-p"
)
==
0
&&
i
<
argc
-
1
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录