Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
315c9c37
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
315c9c37
编写于
5月 05, 2022
作者:
dengyihao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
enh(rpc): taosd exited when fqdn is configed to invalid
上级
ae33656e
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
67 addition
and
7 deletion
+67
-7
include/libs/transport/trpc.h
include/libs/transport/trpc.h
+1
-0
include/os/osSocket.h
include/os/osSocket.h
+1
-0
source/dnode/mgmt/implement/src/dmTransport.c
source/dnode/mgmt/implement/src/dmTransport.c
+7
-5
source/libs/transport/src/trans.c
source/libs/transport/src/trans.c
+12
-1
source/libs/transport/src/transSrv.c
source/libs/transport/src/transSrv.c
+4
-1
source/os/src/osSocket.c
source/os/src/osSocket.c
+42
-0
未找到文件。
include/libs/transport/trpc.h
浏览文件 @
315c9c37
...
...
@@ -68,6 +68,7 @@ typedef int (*RpcAfp)(void *parent, char *tableId, char *spi, char *encrypt, cha
typedef
bool
(
*
RpcRfp
)(
int32_t
code
);
typedef
struct
SRpcInit
{
char
localFqdn
[
TSDB_FQDN_LEN
];
uint16_t
localPort
;
// local port
char
*
label
;
// for debug purpose
int
numOfThreads
;
// number of threads to handle connections
...
...
include/os/osSocket.h
浏览文件 @
315c9c37
...
...
@@ -161,6 +161,7 @@ int taosCreateSocketWithTimeOutOpt(uint32_t conn_timeout_sec);
TdSocketPtr
taosOpenUdpSocket
(
uint32_t
localIp
,
uint16_t
localPort
);
TdSocketPtr
taosOpenTcpClientSocket
(
uint32_t
ip
,
uint16_t
port
,
uint32_t
localIp
);
bool
taosValidIpAndPort
(
uint32_t
ip
,
uint16_t
port
);
TdSocketServerPtr
taosOpenTcpServerSocket
(
uint32_t
ip
,
uint16_t
port
);
int32_t
taosKeepTcpAlive
(
TdSocketPtr
pSocket
);
TdSocketPtr
taosAcceptTcpConnectSocket
(
TdSocketServerPtr
pServerSocket
,
struct
sockaddr
*
destAddr
,
int
*
addrLen
);
...
...
source/dnode/mgmt/implement/src/dmTransport.c
浏览文件 @
315c9c37
...
...
@@ -16,8 +16,8 @@
#define _DEFAULT_SOURCE
#include "dmImp.h"
#define INTERNAL_USER "_dnd"
#define INTERNAL_CKEY "_key"
#define INTERNAL_USER
"_dnd"
#define INTERNAL_CKEY
"_key"
#define INTERNAL_SECRET "_pwd"
static
void
dmGetMnodeEpSet
(
SDnode
*
pDnode
,
SEpSet
*
pEpSet
)
{
...
...
@@ -130,10 +130,10 @@ _OVER:
}
static
void
dmProcessMsg
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
SDnodeTrans
*
pTrans
=
&
pDnode
->
trans
;
SDnodeTrans
*
pTrans
=
&
pDnode
->
trans
;
tmsg_t
msgType
=
pMsg
->
msgType
;
bool
isReq
=
msgType
&
1u
;
SMsgHandle
*
pHandle
=
&
pTrans
->
msgHandles
[
TMSG_INDEX
(
msgType
)];
SMsgHandle
*
pHandle
=
&
pTrans
->
msgHandles
[
TMSG_INDEX
(
msgType
)];
SMgmtWrapper
*
pWrapper
=
pHandle
->
pNdWrapper
;
if
(
msgType
==
TDMT_DND_SERVER_STATUS
)
{
...
...
@@ -517,7 +517,7 @@ static inline int32_t dmRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *s
SAuthReq
authReq
=
{
0
};
tstrncpy
(
authReq
.
user
,
user
,
TSDB_USER_LEN
);
int32_t
contLen
=
tSerializeSAuthReq
(
NULL
,
0
,
&
authReq
);
void
*
pReq
=
rpcMallocCont
(
contLen
);
void
*
pReq
=
rpcMallocCont
(
contLen
);
tSerializeSAuthReq
(
pReq
,
contLen
,
&
authReq
);
SRpcMsg
rpcMsg
=
{.
pCont
=
pReq
,
.
contLen
=
contLen
,
.
msgType
=
TDMT_MND_AUTH
,
.
ahandle
=
(
void
*
)
9528
};
...
...
@@ -547,6 +547,8 @@ static int32_t dmInitServer(SDnode *pDnode) {
SDnodeTrans
*
pTrans
=
&
pDnode
->
trans
;
SRpcInit
rpcInit
=
{
0
};
strncpy
(
rpcInit
.
localFqdn
,
pDnode
->
data
.
localFqdn
,
strlen
(
pDnode
->
data
.
localFqdn
));
rpcInit
.
localPort
=
pDnode
->
data
.
serverPort
;
rpcInit
.
label
=
"DND"
;
rpcInit
.
numOfThreads
=
tsNumOfRpcThreads
;
...
...
source/libs/transport/src/trans.c
浏览文件 @
315c9c37
...
...
@@ -46,9 +46,20 @@ void* rpcOpen(const SRpcInit* pInit) {
pRpc
->
numOfThreads
=
pInit
->
numOfThreads
>
TSDB_MAX_RPC_THREADS
?
TSDB_MAX_RPC_THREADS
:
pInit
->
numOfThreads
;
}
uint32_t
ip
=
0
;
if
(
pInit
->
connType
==
TAOS_CONN_SERVER
)
{
ip
=
taosGetIpv4FromFqdn
(
pInit
->
localFqdn
);
if
(
ip
==
0xFFFFFFFF
)
{
tError
(
"invalid fqdn: %s"
,
pInit
->
localFqdn
);
taosMemoryFree
(
pRpc
);
return
NULL
;
}
}
pRpc
->
connType
=
pInit
->
connType
;
pRpc
->
idleTime
=
pInit
->
idleTime
;
pRpc
->
tcphandle
=
(
*
taosInitHandle
[
pRpc
->
connType
])(
0
,
pInit
->
localPort
,
pRpc
->
label
,
pRpc
->
numOfThreads
,
NULL
,
pRpc
);
pRpc
->
tcphandle
=
(
*
taosInitHandle
[
pRpc
->
connType
])(
ip
,
pInit
->
localPort
,
pRpc
->
label
,
pRpc
->
numOfThreads
,
NULL
,
pRpc
);
if
(
pRpc
->
tcphandle
==
NULL
)
{
taosMemoryFree
(
pRpc
);
return
NULL
;
...
...
source/libs/transport/src/transSrv.c
浏览文件 @
315c9c37
...
...
@@ -817,7 +817,6 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
srv
->
pipe
[
i
]
=
(
uv_pipe_t
*
)
taosMemoryCalloc
(
2
,
sizeof
(
uv_pipe_t
));
uv_os_sock_t
fds
[
2
];
if
(
uv_socketpair
(
SOCK_STREAM
,
0
,
fds
,
UV_NONBLOCK_PIPE
,
UV_NONBLOCK_PIPE
)
!=
0
)
{
goto
End
;
...
...
@@ -841,6 +840,10 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
goto
End
;
}
}
if
(
false
==
taosValidIpAndPort
(
srv
->
ip
,
srv
->
port
))
{
tError
(
"failed to bind, reason: %s"
,
strerror
(
errno
));
goto
End
;
}
if
(
false
==
addHandleToAcceptloop
(
srv
))
{
goto
End
;
}
...
...
source/os/src/osSocket.c
浏览文件 @
315c9c37
...
...
@@ -638,6 +638,48 @@ int32_t taosKeepTcpAlive(TdSocketPtr pSocket) {
return
0
;
}
bool
taosValidIpAndPort
(
uint32_t
ip
,
uint16_t
port
)
{
struct
sockaddr_in
serverAdd
;
SocketFd
fd
;
int32_t
reuse
;
// printf("open tcp server socket:0x%x:%hu", ip, port);
bzero
((
char
*
)
&
serverAdd
,
sizeof
(
serverAdd
));
serverAdd
.
sin_family
=
AF_INET
;
serverAdd
.
sin_addr
.
s_addr
=
ip
;
serverAdd
.
sin_port
=
(
uint16_t
)
htons
(
port
);
if
((
fd
=
socket
(
AF_INET
,
SOCK_STREAM
,
IPPROTO_TCP
))
<=
2
)
{
// printf("failed to open TCP socket: %d (%s)", errno, strerror(errno));
taosCloseSocketNoCheck1
(
fd
);
return
false
;
}
TdSocketPtr
pSocket
=
(
TdSocketPtr
)
taosMemoryMalloc
(
sizeof
(
TdSocket
));
if
(
pSocket
==
NULL
)
{
taosCloseSocketNoCheck1
(
fd
);
return
false
;
}
pSocket
->
refId
=
0
;
pSocket
->
fd
=
fd
;
/* set REUSEADDR option, so the portnumber can be re-used */
reuse
=
1
;
if
(
taosSetSockOpt
(
pSocket
,
SOL_SOCKET
,
SO_REUSEADDR
,
(
void
*
)
&
reuse
,
sizeof
(
reuse
))
<
0
)
{
// printf("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
taosCloseSocket
(
&
pSocket
);
return
NULL
;
}
/* bind socket to server address */
if
(
bind
(
pSocket
->
fd
,
(
struct
sockaddr
*
)
&
serverAdd
,
sizeof
(
serverAdd
))
<
0
)
{
// printf("bind tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno));
taosCloseSocket
(
&
pSocket
);
return
false
;
}
taosCloseSocket
(
&
pSocket
);
return
true
;
}
TdSocketServerPtr
taosOpenTcpServerSocket
(
uint32_t
ip
,
uint16_t
port
)
{
struct
sockaddr_in
serverAdd
;
SocketFd
fd
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录